diff --git a/ringbuffer.go b/ringbuffer.go index d19abd4..54c5a1c 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -312,7 +312,6 @@ func (r *ringBuffer) printBucketContent(bucket string) error { err := r.db.View(func(tx *bolt.Tx) error { bu := tx.Bucket([]byte(bucket)) - fmt.Println("-- K/V STORE DUMP--") bu.ForEach(func(k, v []byte) error { var vv samDBValue err := json.Unmarshal(v, &vv) @@ -322,7 +321,6 @@ func (r *ringBuffer) printBucketContent(bucket string) error { log.Printf("k: %s, v: %v\n", k, vv) return nil }) - fmt.Println("--") return nil }) diff --git a/server.go b/server.go index 0e067ab..ca9cc70 100644 --- a/server.go +++ b/server.go @@ -314,7 +314,6 @@ func (s *server) Stop() { } func (p *processes) printProcessesMap() { - fmt.Println("--------------------------------------------------------------------------------------------") log.Printf("*** Output of processes map :\n") p.mu.Lock() for _, vSub := range p.active { @@ -326,7 +325,6 @@ func (p *processes) printProcessesMap() { p.promTotalProcesses.Set(float64(len(p.active))) - fmt.Println("--------------------------------------------------------------------------------------------") } // sendErrorMessage will put the error message directly on the channel that is diff --git a/steward_test.go b/steward_test.go index d8dca47..bcf5675 100644 --- a/steward_test.go +++ b/steward_test.go @@ -1,7 +1,9 @@ package steward import ( + "flag" "io" + "log" "net" "os" "path/filepath" @@ -12,58 +14,110 @@ import ( natsserver "github.com/nats-io/nats-server/v2/server" ) -func TestSteward(t *testing.T) { - // Start up the nats-server message broker. - nsOpt := &natsserver.Options{ - Host: "127.0.0.1", - Port: 40222, +var logging = flag.Bool("logging", false, "set to true to enable the normal logger of the package") + +// Test the overall functionality of Steward. +// Starting up the server. +// Message passing. +// The different REQ types. +func TestStewardServer(t *testing.T) { + if !*logging { + log.SetOutput(io.Discard) } - ns, err := natsserver.NewServer(nsOpt) - if err != nil { - t.Fatalf("error: failed to start nats-server %v\n", err) - } - - if err := natsserver.Run(ns); err != nil { - natsserver.PrintAndDie(err.Error()) - } + // Start the nats-server message broker. + startNatsServerTest(t) // Start Steward instance // --------------------------------------- - tempdir := "./tmp" + tempdir := t.TempDir() conf := &Configuration{ - SocketFolder: filepath.Join(tempdir, "tmp"), - DatabaseFolder: filepath.Join(tempdir, "var/lib"), - SubscribersDataFolder: filepath.Join(tempdir, "data"), - BrokerAddress: "127.0.0.1:40222", - NodeName: "central", - CentralNodeName: "central", - DefaultMessageRetries: 1, - DefaultMessageTimeout: 3, + SocketFolder: filepath.Join(tempdir, "tmp"), + DatabaseFolder: filepath.Join(tempdir, "var/lib"), + SubscribersDataFolder: filepath.Join(tempdir, "data"), + BrokerAddress: "127.0.0.1:40222", + NodeName: "central", + CentralNodeName: "central", + DefaultMessageRetries: 1, + DefaultMessageTimeout: 3, + + StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}}, StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, } s, err := NewServer(conf) if err != nil { - t.Fatalf("error: failed to start nats-server %v\n", err) + t.Fatalf(" * failed: could not start the Steward instance %v\n", err) } - s.Start() - // Messaging tests. + // Run the message tests // - // Write to socket. // --------------------------------------- - want := "apekatt" + checkREQOpCommandTest(conf, t) + checkREQCliCommandTest(conf, t) + checkREQnCliCommandTest(conf, t) + // --------------------------------------- - socket, err := net.Dial("unix", filepath.Join(conf.SocketFolder, "steward.sock")) - if err != nil { - t.Fatalf("error: failed to open socket file for writing: %v\n", err) - } - defer socket.Close() + s.Stop() +} + +// Testing op (operator) Commands. +func checkREQOpCommandTest(conf *Configuration, t *testing.T) { + m := `[ + { + "directory":"commands-executed", + "fileExtension": ".result", + "toNode": "central", + "data": [], + "method":"REQOpCommand", + "operation":{ + "opCmd":"ps" + }, + "replyMethod":"REQToFileAppend", + "ACKTimeout":3, + "retries":3, + "replyACKTimeout":3, + "replyRetries":3, + "MethodTimeout": 7 + } + ]` + + writeToSocketTest(conf, m, t) + + resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQOpCommand.result") + findStringInFileTest("central.REQOpCommand.CommandACK", resultFile, conf, t) + +} + +// Sending of CLI Commands. +func checkREQCliCommandTest(conf *Configuration, t *testing.T) { + m := `[ + { + "directory":"commands-executed", + "fileExtension":".result", + "toNode": "central", + "data": ["bash","-c","echo apekatt"], + "replyMethod":"REQToFileAppend", + "method":"REQCliCommand", + "ACKTimeout":3, + "retries":3, + "methodTimeout": 10 + } + ]` + + writeToSocketTest(conf, m, t) + + resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQCliCommand.result") + findStringInFileTest("apekatt", resultFile, conf, t) + +} + +// The non-sequential sending of CLI Commands. +func checkREQnCliCommandTest(conf *Configuration, t *testing.T) { m := `[ { "directory":"commands-executed", @@ -78,33 +132,72 @@ func TestSteward(t *testing.T) { } ]` - _, err = socket.Write([]byte(m)) - if err != nil { - t.Fatalf("error: failed to write to socket: %v\n", err) - } - - // Wait a couple of seconds for the request to go through.. - time.Sleep(time.Second * 2) + writeToSocketTest(conf, m, t) resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQnCliCommand.result") - fh, err := os.Open(resultFile) + findStringInFileTest("apekatt", resultFile, conf, t) + +} + +// Check if a file contains the given string. +func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) { + // Wait n seconds for the results file to be created + for i := 0; i <= 30; i++ { + _, err := os.Stat(fileName) + if os.IsNotExist(err) { + time.Sleep(time.Millisecond * 500) + continue + } + + if os.IsNotExist(err) && i >= 20 { + t.Fatalf(" * failed: no result file created for request within the given time\n") + } + } + + fh, err := os.Open(fileName) if err != nil { - t.Fatalf("error: failed open result file: %v\n", err) + t.Fatalf(" * failed: could not open result file: %v\n", err) } result, err := io.ReadAll(fh) if err != nil { - t.Fatalf("error: failed read result file: %v\n", err) + t.Fatalf(" * failed: could not read result file: %v\n", err) } - if strings.Contains(string(result), want) { - t.Fatalf("error: did not find expexted word `%v` in file ", want) + if !strings.Contains(string(result), want) { + t.Fatalf(" * failed: did not find expexted word `%v` in result file ", want) + } +} + +// Write message to socket for testing purposes. +func writeToSocketTest(conf *Configuration, messageText string, t *testing.T) { + socket, err := net.Dial("unix", filepath.Join(conf.SocketFolder, "steward.sock")) + if err != nil { + t.Fatalf(" * failed: could to open socket file for writing: %v\n", err) + } + defer socket.Close() + + _, err = socket.Write([]byte(messageText)) + if err != nil { + t.Fatalf(" * failed: could not write to socket: %v\n", err) + } + +} + +// Start up the nats-server message broker. +func startNatsServerTest(t *testing.T) { + // Start up the nats-server message broker. + nsOpt := &natsserver.Options{ + Host: "127.0.0.1", + Port: 40222, + } + + ns, err := natsserver.NewServer(nsOpt) + if err != nil { + t.Fatalf(" * failed: could not start the nats-server %v\n", err) + } + + if err := natsserver.Run(ns); err != nil { + natsserver.PrintAndDie(err.Error()) } - - // --------------------------------------- - - s.Stop() - - // Shutdown services. - ns.Shutdown() }