diff --git a/requests_test.go b/requests_test.go index c2d0fad..f2997e6 100644 --- a/requests_test.go +++ b/requests_test.go @@ -2,18 +2,26 @@ package steward import ( "bytes" + "context" "encoding/json" + "flag" + "fmt" "io" "log" "net" "net/http" + "os" "path/filepath" "strings" "testing" + "time" + "github.com/fsnotify/fsnotify" natsserver "github.com/nats-io/nats-server/v2/server" ) +var logging = flag.Bool("logging", false, "set to true to enable the normal logger of the package") + func newServerForTesting(t *testing.T, addressAndPort string, testFolder string) (*server, *Configuration) { if !*logging { log.SetOutput(io.Discard) @@ -81,6 +89,10 @@ func writeMsgsToSocketTest(conf *Configuration, messages []Message, t *testing.T } func TestRequest(t *testing.T) { + if !*logging { + log.SetOutput(io.Discard) + } + type containsOrEquals int const ( REQTestContains containsOrEquals = iota @@ -109,8 +121,8 @@ func TestRequest(t *testing.T) { defer ns.Shutdown() // tempdir := t.TempDir() - tempdir := "tmp" - srv, conf := newServerForTesting(t, "127.0.0.1:42222", tempdir) + tempDir := "tmp" + srv, conf := newServerForTesting(t, "127.0.0.1:42222", tempDir) srv.Start() defer srv.Stop() @@ -294,4 +306,268 @@ func TestRequest(t *testing.T) { t.Logf(" \U0001f600 [SUCCESS] : %v\n", tt.info) } } + + // --- Other REQ tests that does not fit well into the general table above. + + checkREQTailFileTest(srv, conf, t, tempDir) + checkMetricValuesTest(srv, conf, t, tempDir) + checkErrorKernelMalformedJSONtest(srv, conf, t, tempDir) +} + +// Check the tailing of files type. +func checkREQTailFileTest(stewardServer *server, conf *Configuration, t *testing.T, tmpDir string) error { + // Create a file with some content. + fp := filepath.Join(tmpDir, "test.file") + fh, err := os.OpenFile(fp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) + if err != nil { + return fmt.Errorf(" * failed: unable to open temporary file: %v", err) + } + defer fh.Close() + + ctx, cancel := context.WithCancel(context.Background()) + + // Write content to the file with specified intervals. + go func() { + for i := 1; i <= 10; i++ { + _, err = fh.Write([]byte("some file content\n")) + if err != nil { + fmt.Printf(" * failed: writing to temporary file: %v\n", err) + } + fh.Sync() + time.Sleep(time.Millisecond * 500) + + // Check if we've received a done, else default to continuing. + select { + case <-ctx.Done(): + return + default: + // no done received, we're continuing. + } + + } + }() + + s := `[ + { + "directory": "tail-files", + "fileName": "fileName.result", + "toNode": "central", + "methodArgs": ["` + fp + `"], + "method":"REQTailFile", + "ACKTimeout":5, + "retries":3, + "methodTimeout": 10 + } + ]` + + writeToSocketTest(conf, s, t) + + // time.Sleep(time.Second * 5) + + resultFile := filepath.Join(conf.SubscribersDataFolder, "tail-files", "central", "fileName.result") + + // Wait n times for result file to be created. + n := 5 + for i := 0; i <= n; i++ { + _, err := os.Stat(resultFile) + if os.IsNotExist(err) { + time.Sleep(time.Millisecond * 1000) + continue + } + + if os.IsNotExist(err) && i >= n { + cancel() + return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: no result file created for request within the given time") + } + } + + cancel() + + _, err = findStringInFileTest("some file content", resultFile, conf, t) + if err != nil { + return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: %v", err) + } + + t.Logf(" \U0001f600 [SUCCESS] : checkREQTailFileTest\n") + return nil +} + +func checkMetricValuesTest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error { + mfs, err := stewardServer.metrics.promRegistry.Gather() + if err != nil { + return fmt.Errorf("error: promRegistry.gathering: %v", mfs) + } + + if len(mfs) <= 0 { + return fmt.Errorf("error: promRegistry.gathering: did not find any metric families: %v", mfs) + } + + found := false + for _, mf := range mfs { + if mf.GetName() == "steward_processes_total" { + found = true + + m := mf.GetMetric() + + if m[0].Gauge.GetValue() <= 0 { + return fmt.Errorf("error: promRegistry.gathering: did not find any running processes in metric for processes_total : %v", m[0].Gauge.GetValue()) + } + } + } + + if !found { + return fmt.Errorf("error: promRegistry.gathering: did not find specified metric processes_total") + } + + t.Logf(" \U0001f600 [SUCCESS] : checkMetricValuesTest") + + return nil +} + +// Check errorKernel +func checkErrorKernelMalformedJSONtest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error { + + // JSON message with error, missing brace. + m := `[ + { + "directory": "some dir", + "fileName":"someext", + "toNode": "somenode", + "data": ["some data"], + "method": "REQErrorLog" + missing brace here..... + ]` + + writeToSocketTest(conf, m, t) + + resultFile := filepath.Join(conf.SubscribersDataFolder, "errorLog", "errorCentral", "error.log") + + // Wait n times for error file to be created. + n := 5 + for i := 0; i <= n; i++ { + _, err := os.Stat(resultFile) + if os.IsNotExist(err) { + time.Sleep(time.Millisecond * 1000) + continue + } + + if os.IsNotExist(err) && i >= n { + return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: no result file created for request within the given time") + } + } + + // Start checking if the result file is being updated. + chUpdated := make(chan bool) + go checkFileUpdated(resultFile, chUpdated) + + // We wait 5 seconds for an update, or else we fail. + ticker := time.NewTicker(time.Second * 5) + + for { + select { + case <-chUpdated: + // We got an update, so we continue to check if we find the string we're + // looking for. + found, err := findStringInFileTest("error: malformed json", resultFile, conf, t) + if !found && err != nil { + return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: %v", err) + } + + if !found && err == nil { + continue + } + + if found { + t.Logf(" \U0001f600 [SUCCESS] : checkErrorKernelMalformedJSONtest") + return nil + } + case <-ticker.C: + return fmt.Errorf(" * failed: did not get an update in the errorKernel log file") + } + } +} + +// Check if file are getting updated with new content. +func checkFileUpdated(fileRealPath string, fileUpdated chan bool) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Println("Failed fsnotify.NewWatcher") + return + } + defer watcher.Close() + + done := make(chan bool) + go func() { + //Give a true value to updated so it reads the file the first time. + fileUpdated <- true + for { + select { + case event := <-watcher.Events: + log.Println("event:", event) + if event.Op&fsnotify.Write == fsnotify.Write { + log.Println("modified file:", event.Name) + //testing with an update chan to get updates + fileUpdated <- true + } + case err := <-watcher.Errors: + log.Println("error:", err) + } + } + }() + + err = watcher.Add(fileRealPath) + if err != nil { + log.Fatal(err) + } + <-done +} + +// Check if a file contains the given string. +func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) { + // Wait n seconds for the results file to be created + n := 5 + + for i := 0; i <= n; i++ { + _, err := os.Stat(fileName) + if os.IsNotExist(err) { + time.Sleep(time.Millisecond * 500) + continue + } + + if os.IsNotExist(err) && i >= n { + return false, fmt.Errorf(" * failed: no result file created for request within the given time\n") + } + } + + fh, err := os.Open(fileName) + if err != nil { + return false, fmt.Errorf(" * failed: could not open result file: %v", err) + } + + result, err := io.ReadAll(fh) + if err != nil { + return false, fmt.Errorf(" * failed: could not read result file: %v", err) + } + + found := strings.Contains(string(result), want) + if !found { + return false, nil + } + + return true, nil +} + +// Write message to socket for testing purposes. +func writeToSocketTest(conf *Configuration, messageText string, t *testing.T) { + socket, err := net.Dial("unix", filepath.Join(conf.SocketFolder, "steward.sock")) + if err != nil { + t.Fatalf(" * failed: could to open socket file for writing: %v\n", err) + } + defer socket.Close() + + _, err = socket.Write([]byte(messageText)) + if err != nil { + t.Fatalf(" * failed: could not write to socket: %v\n", err) + } + } diff --git a/steward_test.go b/steward_test.go deleted file mode 100644 index 8aa3da0..0000000 --- a/steward_test.go +++ /dev/null @@ -1,343 +0,0 @@ -// Package functionality tests for Steward. -// -// To turn on the normal logging functionality during tests, use: -// go test -logging=true -package steward - -import ( - "context" - "flag" - "fmt" - "io" - "log" - "net" - "os" - "path/filepath" - "strings" - "testing" - "time" - - "github.com/fsnotify/fsnotify" - natsserver "github.com/nats-io/nats-server/v2/server" -) - -// Set the default logging functionality of the package to false. -var logging = flag.Bool("logging", false, "set to true to enable the normal logger of the package") - -// Test the overall functionality of Steward. -// Starting up the server. -// Message passing. -// The different REQ types. -func TestStewardServer(t *testing.T) { - // if !*logging { - // log.SetOutput(io.Discard) - // } - - t.Logf("--------------- 1\n") - - ns := newNatsServerForTesting(t, 40222) - if err := natsserver.Run(ns); err != nil { - natsserver.PrintAndDie(err.Error()) - } - defer ns.Shutdown() - - t.Logf("--------------- 2\n") - - tempDir := t.TempDir() - srv, conf := newServerForTesting(t, "127.0.0.1:40222", tempDir) - srv.Start() - defer srv.Stop() - - t.Logf("--------------- 3\n") - - // Run the message tests - // - // --------------------------------------- - - type testFunc func(*server, *Configuration, *testing.T, string) error - - // Specify all the test funcs to run in the slice. - funcs := []testFunc{ - checkREQTailFileTest, - checkErrorKernelMalformedJSONtest, - checkMetricValuesTest, - } - - for _, f := range funcs { - err := f(srv, conf, t, tempDir) - if err != nil { - t.Errorf("%v\n", err) - } - } - -} - -// ---------------------------------------------------------------------------- -// Check REQ types -// ---------------------------------------------------------------------------- - -// Check the tailing of files type. -func checkREQTailFileTest(stewardServer *server, conf *Configuration, t *testing.T, tmpDir string) error { - // Create a file with some content. - fp := filepath.Join(tmpDir, "test.file") - fh, err := os.OpenFile(fp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) - if err != nil { - return fmt.Errorf(" * failed: unable to open temporary file: %v", err) - } - defer fh.Close() - - ctx, cancel := context.WithCancel(context.Background()) - - // Write content to the file with specified intervals. - go func() { - for i := 1; i <= 10; i++ { - _, err = fh.Write([]byte("some file content\n")) - if err != nil { - fmt.Printf(" * failed: writing to temporary file: %v\n", err) - } - fh.Sync() - time.Sleep(time.Millisecond * 500) - - // Check if we've received a done, else default to continuing. - select { - case <-ctx.Done(): - return - default: - // no done received, we're continuing. - } - - } - }() - - s := `[ - { - "directory": "tail-files", - "fileName": "fileName.result", - "toNode": "central", - "methodArgs": ["` + fp + `"], - "method":"REQTailFile", - "ACKTimeout":5, - "retries":3, - "methodTimeout": 10 - } - ]` - - writeToSocketTest(conf, s, t) - - // time.Sleep(time.Second * 5) - - resultFile := filepath.Join(conf.SubscribersDataFolder, "tail-files", "central", "fileName.result") - - // Wait n times for result file to be created. - n := 5 - for i := 0; i <= n; i++ { - _, err := os.Stat(resultFile) - if os.IsNotExist(err) { - time.Sleep(time.Millisecond * 1000) - continue - } - - if os.IsNotExist(err) && i >= n { - cancel() - return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: no result file created for request within the given time") - } - } - - cancel() - - _, err = findStringInFileTest("some file content", resultFile, conf, t) - if err != nil { - return fmt.Errorf(" \U0001F631 [FAILED] : checkREQTailFileTest: %v", err) - } - - t.Logf(" \U0001f600 [SUCCESS] : checkREQTailFileTest\n") - return nil -} - -// ---------------------------------------------------------------------------- -// Functionality checks -// ---------------------------------------------------------------------------- - -// Check errorKernel -func checkErrorKernelMalformedJSONtest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error { - - // JSON message with error, missing brace. - m := `[ - { - "directory": "some dir", - "fileName":"someext", - "toNode": "somenode", - "data": ["some data"], - "method": "REQErrorLog" - missing brace here..... - ]` - - writeToSocketTest(conf, m, t) - - resultFile := filepath.Join(conf.SubscribersDataFolder, "errorLog", "errorCentral", "error.log") - - // Wait n times for error file to be created. - n := 5 - for i := 0; i <= n; i++ { - _, err := os.Stat(resultFile) - if os.IsNotExist(err) { - time.Sleep(time.Millisecond * 1000) - continue - } - - if os.IsNotExist(err) && i >= n { - return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: no result file created for request within the given time") - } - } - - // Start checking if the result file is being updated. - chUpdated := make(chan bool) - go checkFileUpdated(resultFile, chUpdated) - - // We wait 5 seconds for an update, or else we fail. - ticker := time.NewTicker(time.Second * 5) - - for { - select { - case <-chUpdated: - // We got an update, so we continue to check if we find the string we're - // looking for. - found, err := findStringInFileTest("error: malformed json", resultFile, conf, t) - if !found && err != nil { - return fmt.Errorf(" \U0001F631 [FAILED] : checkErrorKernelMalformedJSONtest: %v", err) - } - - if !found && err == nil { - continue - } - - if found { - t.Logf(" \U0001f600 [SUCCESS] : checkErrorKernelMalformedJSONtest") - return nil - } - case <-ticker.C: - return fmt.Errorf(" * failed: did not get an update in the errorKernel log file") - } - } -} - -func checkMetricValuesTest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error { - mfs, err := stewardServer.metrics.promRegistry.Gather() - if err != nil { - return fmt.Errorf("error: promRegistry.gathering: %v", mfs) - } - - if len(mfs) <= 0 { - return fmt.Errorf("error: promRegistry.gathering: did not find any metric families: %v", mfs) - } - - found := false - for _, mf := range mfs { - if mf.GetName() == "steward_processes_total" { - found = true - - m := mf.GetMetric() - - if m[0].Gauge.GetValue() <= 0 { - return fmt.Errorf("error: promRegistry.gathering: did not find any running processes in metric for processes_total : %v", m[0].Gauge.GetValue()) - } - } - } - - if !found { - return fmt.Errorf("error: promRegistry.gathering: did not find specified metric processes_total") - } - - t.Logf(" \U0001f600 [SUCCESS] : checkMetricValuesTest") - - return nil -} - -// ---------------------------------------------------------------------------- -// Helper functions for tests -// ---------------------------------------------------------------------------- - -// Check if file are getting updated with new content. -func checkFileUpdated(fileRealPath string, fileUpdated chan bool) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - log.Println("Failed fsnotify.NewWatcher") - return - } - defer watcher.Close() - - done := make(chan bool) - go func() { - //Give a true value to updated so it reads the file the first time. - fileUpdated <- true - for { - select { - case event := <-watcher.Events: - log.Println("event:", event) - if event.Op&fsnotify.Write == fsnotify.Write { - log.Println("modified file:", event.Name) - //testing with an update chan to get updates - fileUpdated <- true - } - case err := <-watcher.Errors: - log.Println("error:", err) - } - } - }() - - err = watcher.Add(fileRealPath) - if err != nil { - log.Fatal(err) - } - <-done -} - -// Check if a file contains the given string. -func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) { - // Wait n seconds for the results file to be created - n := 5 - - for i := 0; i <= n; i++ { - _, err := os.Stat(fileName) - if os.IsNotExist(err) { - time.Sleep(time.Millisecond * 500) - continue - } - - if os.IsNotExist(err) && i >= n { - return false, fmt.Errorf(" * failed: no result file created for request within the given time\n") - } - } - - fh, err := os.Open(fileName) - if err != nil { - return false, fmt.Errorf(" * failed: could not open result file: %v", err) - } - - result, err := io.ReadAll(fh) - if err != nil { - return false, fmt.Errorf(" * failed: could not read result file: %v", err) - } - - found := strings.Contains(string(result), want) - if !found { - return false, nil - } - - return true, nil -} - -// Write message to socket for testing purposes. -func writeToSocketTest(conf *Configuration, messageText string, t *testing.T) { - socket, err := net.Dial("unix", filepath.Join(conf.SocketFolder, "steward.sock")) - if err != nil { - t.Fatalf(" * failed: could to open socket file for writing: %v\n", err) - } - defer socket.Close() - - _, err = socket.Write([]byte(messageText)) - if err != nil { - t.Fatalf(" * failed: could not write to socket: %v\n", err) - } - -}