diff --git a/read_socket.go b/read_socket.go index 44e8e82..90597e3 100644 --- a/read_socket.go +++ b/read_socket.go @@ -14,7 +14,7 @@ func (s *server) readSocket(toRingbufferCh chan []subjectAndMessage) { // Loop, and wait for new connections. for { - conn, err := s.StewardSockListener.Accept() + conn, err := s.StewardSocket.Accept() if err != nil { er := fmt.Errorf("error: failed to accept conn on socket: %v", err) sendErrorLogMessage(toRingbufferCh, Node(s.nodeName), er) diff --git a/ringbuffer.go b/ringbuffer.go index aaab9d0..d19abd4 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -107,7 +107,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri func() { s, err := r.dumpBucket(samValueBucket) if err != nil { - er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previos entries in db to process: %v", err) + er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v", err) log.Printf("%v\n", er) return //sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), er) diff --git a/server.go b/server.go index 8444da9..0e067ab 100644 --- a/server.go +++ b/server.go @@ -76,9 +76,9 @@ type server struct { // The nats connection to the broker natsConn *nats.Conn // net listener for communicating via the steward socket - StewardSockListener net.Listener + StewardSocket net.Listener // net listener for the communication with Stew - StewSockListener net.Listener + StewSocket net.Listener // processes holds all the information about running processes processes *processes // The name of the node @@ -155,7 +155,6 @@ func NewServer(c *Configuration) (*server, error) { } socketFilepath := filepath.Join(c.SocketFolder, "steward.sock") - if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) { err = os.Remove(socketFilepath) if err != nil { @@ -166,6 +165,7 @@ func NewServer(c *Configuration) (*server, error) { } nl, err := net.Listen("unix", socketFilepath) + if err != nil { er := fmt.Errorf("error: failed to open socket: %v", err) cancel() @@ -208,16 +208,16 @@ func NewServer(c *Configuration) (*server, error) { metrics := newMetrics(c.PromHostAndPort) s := &server{ - ctx: ctx, - ctxCancelFunc: cancel, - configuration: c, - nodeName: c.NodeName, - natsConn: conn, - StewardSockListener: nl, - StewSockListener: stewNL, - processes: newProcesses(metrics.promRegistry), - toRingbufferCh: make(chan []subjectAndMessage), - metrics: metrics, + ctx: ctx, + ctxCancelFunc: cancel, + configuration: c, + nodeName: c.NodeName, + natsConn: conn, + StewardSocket: nl, + StewSocket: stewNL, + processes: newProcesses(metrics.promRegistry), + toRingbufferCh: make(chan []subjectAndMessage), + metrics: metrics, } // Create the default data folder for where subscribers should @@ -266,19 +266,6 @@ func (s *server) Start() { // Start the checking the input socket for new messages from operator. go s.readSocket(s.toRingbufferCh) - // Delete the socket file when the program exits. - defer func() { - socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock") - - if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) { - err = os.Remove(socketFilepath) - if err != nil { - er := fmt.Errorf("error: could not delete sock file: %v", err) - log.Printf("%v\n", er) - } - } - }() - // Start up the predefined subscribers. Since all the logic to handle // processes are tied to the process struct, we need to create an // initial process to start the rest. @@ -297,6 +284,10 @@ func (s *server) Start() { // Will stop all processes started during startup. func (s *server) Stop() { + // TODO: Add done sync functionality within the + // stop functions so we get a confirmation that + // all processes actually are stopped. + // Stop the started pub/sub message processes. s.ctxSubscribersCancelFunc() log.Printf("info: stopped all subscribers\n") @@ -308,6 +299,18 @@ func (s *server) Stop() { // Stop the main context. s.ctxCancelFunc() log.Printf("info: stopped the main context\n") + + // Delete the socket file when the program exits. + socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock") + + if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) { + err = os.Remove(socketFilepath) + if err != nil { + er := fmt.Errorf("error: could not delete sock file: %v", err) + log.Printf("%v\n", er) + } + } + } func (p *processes) printProcessesMap() { diff --git a/steward_test.go b/steward_test.go index b3c9cb5..d8dca47 100644 --- a/steward_test.go +++ b/steward_test.go @@ -1,7 +1,11 @@ package steward import ( + "io" + "net" + "os" "path/filepath" + "strings" "testing" "time" @@ -24,18 +28,22 @@ func TestSteward(t *testing.T) { natsserver.PrintAndDie(err.Error()) } - // Start Steward instance for testing - tempdir := t.TempDir() + // Start Steward instance + // --------------------------------------- + tempdir := "./tmp" conf := &Configuration{ - SocketFolder: filepath.Join(tempdir, "tmp"), - DatabaseFolder: filepath.Join(tempdir, "var/lib"), - SubscribersDataFolder: filepath.Join(tempdir, "data"), - BrokerAddress: "127.0.0.1:40222", - NodeName: "central", - CentralNodeName: "central", - DefaultMessageRetries: 1, - DefaultMessageTimeout: 3, + SocketFolder: filepath.Join(tempdir, "tmp"), + DatabaseFolder: filepath.Join(tempdir, "var/lib"), + SubscribersDataFolder: filepath.Join(tempdir, "data"), + BrokerAddress: "127.0.0.1:40222", + NodeName: "central", + CentralNodeName: "central", + DefaultMessageRetries: 1, + DefaultMessageTimeout: 3, + StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, } s, err := NewServer(conf) if err != nil { @@ -44,11 +52,59 @@ func TestSteward(t *testing.T) { s.Start() + // Messaging tests. + // + // Write to socket. + // --------------------------------------- + want := "apekatt" + + socket, err := net.Dial("unix", filepath.Join(conf.SocketFolder, "steward.sock")) + if err != nil { + t.Fatalf("error: failed to open socket file for writing: %v\n", err) + } + defer socket.Close() + + m := `[ + { + "directory":"commands-executed", + "fileExtension":".result", + "toNode": "central", + "data": ["bash","-c","echo apekatt"], + "replyMethod":"REQToFileAppend", + "method":"REQnCliCommand", + "ACKTimeout":3, + "retries":3, + "methodTimeout": 10 + } + ]` + + _, err = socket.Write([]byte(m)) + if err != nil { + t.Fatalf("error: failed to write to socket: %v\n", err) + } + + // Wait a couple of seconds for the request to go through.. + time.Sleep(time.Second * 2) + + resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQnCliCommand.result") + fh, err := os.Open(resultFile) + if err != nil { + t.Fatalf("error: failed open result file: %v\n", err) + } + + result, err := io.ReadAll(fh) + if err != nil { + t.Fatalf("error: failed read result file: %v\n", err) + } + + if strings.Contains(string(result), want) { + t.Fatalf("error: did not find expexted word `%v` in file ", want) + } + + // --------------------------------------- + s.Stop() - // Shutdown services + // Shutdown services. ns.Shutdown() - - time.Sleep(time.Second * 5) - } diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 64149e3..f4bc7ee 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -522,7 +522,7 @@ func (m methodREQToFileAppend) handler(proc process, message Message, node strin // Open file and write data. file := filepath.Join(folderTree, fileName) - f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, os.ModeAppend) + f, err := os.OpenFile(file, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) if err != nil { er := fmt.Errorf("error: methodEventTextLogging.handler: failed to open file : %v, message: %v", err, message) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)