mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
moved socket creation into own function
This commit is contained in:
parent
d8fe766ec8
commit
248258d8e8
2 changed files with 49 additions and 58 deletions
|
@ -71,6 +71,8 @@ type Configuration struct {
|
|||
Serialization string
|
||||
// SetBlockProfileRate for block profiling
|
||||
SetBlockProfileRate int
|
||||
// EnableSocket for enabling the creation of a steward.sock file
|
||||
EnableSocket bool
|
||||
|
||||
// NOTE:
|
||||
// Op commands will not be specified as a flag since they can't be turned off.
|
||||
|
@ -140,6 +142,7 @@ type ConfigurationFromFile struct {
|
|||
Compression *string
|
||||
Serialization *string
|
||||
SetBlockProfileRate *int
|
||||
EnableSocket *bool
|
||||
|
||||
StartPubREQHello *int
|
||||
StartSubREQErrorLog *bool
|
||||
|
@ -194,6 +197,7 @@ func newConfigurationDefaults() Configuration {
|
|||
Compression: "",
|
||||
Serialization: "",
|
||||
SetBlockProfileRate: 0,
|
||||
EnableSocket: true,
|
||||
|
||||
StartSubREQErrorLog: true,
|
||||
StartSubREQHello: true,
|
||||
|
@ -354,6 +358,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
|||
} else {
|
||||
conf.SetBlockProfileRate = *cf.SetBlockProfileRate
|
||||
}
|
||||
if cf.EnableSocket == nil {
|
||||
conf.EnableSocket = cd.EnableSocket
|
||||
} else {
|
||||
conf.EnableSocket = *cf.EnableSocket
|
||||
}
|
||||
|
||||
if cf.StartPubREQHello == nil {
|
||||
conf.StartPubREQHello = cd.StartPubREQHello
|
||||
|
@ -489,6 +498,7 @@ func (c *Configuration) CheckFlags() error {
|
|||
flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression")
|
||||
flag.StringVar(&c.Serialization, "serialization", fc.Serialization, "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob")
|
||||
flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", fc.SetBlockProfileRate, "Enable block profiling by setting the value to f.ex. 1. 0 = disabled")
|
||||
flag.BoolVar(&c.EnableSocket, "enableSocket", fc.EnableSocket, "true/false for enabling the creation of a steward.sock file")
|
||||
|
||||
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")
|
||||
|
||||
|
|
97
server.go
97
server.go
|
@ -110,70 +110,20 @@ func NewServer(c *Configuration, version string) (*server, error) {
|
|||
log.Printf(" * conn.Opts.ReconnectJitterTLS: %v\n", conn.Opts.ReconnectJitterTLS)
|
||||
log.Printf(" * conn.Opts.ReconnectJitter: %v\n", conn.Opts.ReconnectJitter)
|
||||
|
||||
// Prepare the connection to the Steward socket file
|
||||
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.SocketFolder, 0700)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Just as an extra check we eventually delete any existing Steward socket files if found.
|
||||
socketFilepath := filepath.Join(c.SocketFolder, "steward.sock")
|
||||
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
|
||||
err = os.Remove(socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete sock file: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
||||
// Open the socket.
|
||||
nl, err := net.Listen("unix", socketFilepath)
|
||||
// Open the steward socket file, and start the listener if enabled.
|
||||
stewardSocket, err := createSocket(c.SocketFolder, "steward.sock")
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open socket: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
// Prepare the connection to the Stew socket file
|
||||
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.SocketFolder, 0700)
|
||||
if err != nil {
|
||||
cancel()
|
||||
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", c.SocketFolder, err)
|
||||
}
|
||||
}
|
||||
|
||||
stewSocketFilepath := filepath.Join(c.SocketFolder, "stew.sock")
|
||||
|
||||
// Just as an extra check we eventually delete any existing Stew socket files if found.
|
||||
if _, err := os.Stat(stewSocketFilepath); !os.IsNotExist(err) {
|
||||
err = os.Remove(stewSocketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete stew.sock file: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
||||
stewNL, err := net.Listen("unix", stewSocketFilepath)
|
||||
// Open the stew socket file, and start the listener if enabled.
|
||||
stewSocket, err := createSocket(c.SocketFolder, "stew.sock")
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open stew socket: %v", err)
|
||||
cancel()
|
||||
return nil, er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// ---
|
||||
|
||||
metrics := newMetrics(c.PromHostAndPort)
|
||||
|
||||
s := &server{
|
||||
|
@ -182,8 +132,8 @@ func NewServer(c *Configuration, version string) (*server, error) {
|
|||
configuration: c,
|
||||
nodeName: c.NodeName,
|
||||
natsConn: conn,
|
||||
StewardSocket: nl,
|
||||
StewSocket: stewNL,
|
||||
StewardSocket: stewardSocket,
|
||||
StewSocket: stewSocket,
|
||||
processes: newProcesses(ctx, metrics),
|
||||
newMessagesCh: make(chan []subjectAndMessage),
|
||||
metrics: metrics,
|
||||
|
@ -208,6 +158,37 @@ func NewServer(c *Configuration, version string) (*server, error) {
|
|||
|
||||
}
|
||||
|
||||
// create socket will create a socket file, and return the net.Listener to
|
||||
// communicate with that socket.
|
||||
func createSocket(socketFolder string, socketFileName string) (net.Listener, error) {
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(socketFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(socketFolder, 0700)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error: failed to create socket folder directory %v: %v", socketFolder, err)
|
||||
}
|
||||
}
|
||||
|
||||
// Just as an extra check we eventually delete any existing Steward socket files if found.
|
||||
socketFilepath := filepath.Join(socketFolder, socketFileName)
|
||||
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
|
||||
err = os.Remove(socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete sock file: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
||||
// Open the socket.
|
||||
nl, err := net.Listen("unix", socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open socket: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
|
||||
return nl, nil
|
||||
}
|
||||
|
||||
// Start will spawn up all the predefined subscriber processes.
|
||||
// Spawning of publisher processes is done on the fly by checking
|
||||
// if there is publisher process for a given message subject, and
|
||||
|
|
Loading…
Reference in a new issue