diff --git a/processes.go b/processes.go index c901936..42adcb3 100644 --- a/processes.go +++ b/processes.go @@ -343,7 +343,8 @@ func (s startup) pubREQKeysRequestUpdate(p process) { // and update with new keys back. proc.nodeAuth.publicKeys.mu.Lock() - fmt.Printf(" ----> publisher REQKeysRequestUpdate: sending our current hash: %v\n", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:])) + er := fmt.Errorf(" ----> publisher REQKeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:])) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) m := Message{ FileName: "publickeysget.log", @@ -398,7 +399,8 @@ func (s startup) pubREQAclRequestUpdate(p process) { // and update with new keys back. proc.nodeAuth.nodeAcl.mu.Lock() - fmt.Printf(" ----> publisher REQAclRequestUpdate: sending our current hash: %v\n", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:])) + er := fmt.Errorf(" ----> publisher REQAclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:])) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) m := Message{ FileName: "aclRequestUpdate.log", diff --git a/requests_test.go b/requests_test.go index e7efc26..3286dc1 100644 --- a/requests_test.go +++ b/requests_test.go @@ -336,6 +336,7 @@ func TestRequest(t *testing.T) { checkREQTailFileTest(tstSrv, tstConf, t, tstTempDir) checkMetricValuesTest(tstSrv, tstConf, t, tstTempDir) checkErrorKernelMalformedJSONtest(tstSrv, tstConf, t, tstTempDir) + checkREQCopySrc(tstSrv, tstConf, t, tstTempDir) } // Check the tailing of files type. @@ -386,16 +387,14 @@ func checkREQTailFileTest(stewardServer *server, conf *Configuration, t *testing writeToSocketTest(conf, s, t) - // time.Sleep(time.Second * 5) - resultFile := filepath.Join(conf.SubscribersDataFolder, "tail-files", "central", "fileName.result") // Wait n times for result file to be created. - n := 5 + n := 50 for i := 0; i <= n; i++ { _, err := os.Stat(resultFile) if os.IsNotExist(err) { - time.Sleep(time.Millisecond * 1000) + time.Sleep(time.Millisecond * 100) continue } @@ -416,6 +415,64 @@ func checkREQTailFileTest(stewardServer *server, conf *Configuration, t *testing return nil } +// Check the file copier. +func checkREQCopySrc(stewardServer *server, conf *Configuration, t *testing.T, tmpDir string) error { + testFiles := 5 + + for i := 1; i <= testFiles; i++ { + + // Create a file with some content. + srcFileName := fmt.Sprintf("copysrc%v.file", i) + srcfp := filepath.Join(tmpDir, srcFileName) + fh, err := os.OpenFile(srcfp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600) + if err != nil { + t.Fatalf(" \U0001F631 [FAILED] : checkREQCopySrc: unable to open temporary file: %v", err) + } + defer fh.Close() + + // Write content to the file. + + _, err = fh.Write([]byte("some file content\n")) + if err != nil { + t.Fatalf(" \U0001F631 [FAILED] : checkREQCopySrc: writing to temporary file: %v\n", err) + } + + dstFileName := fmt.Sprintf("copydst%v.file", i) + dstfp := filepath.Join(tmpDir, dstFileName) + + s := `[ + { + "toNode": "central", + "method":"REQCopySrc", + "methodArgs": ["` + srcfp + `","central","` + dstfp + `","20","10"], + "ACKTimeout":5, + "retries":3, + "methodTimeout": 10 + } + ]` + + writeToSocketTest(conf, s, t) + + // Wait n times for result file to be created. + n := 50 + for i := 0; i <= n; i++ { + _, err := os.Stat(dstfp) + if os.IsNotExist(err) { + time.Sleep(time.Millisecond * 100) + continue + } + + if os.IsNotExist(err) && i >= n { + t.Fatalf(" \U0001F631 [FAILED] : checkREQCopySrc: no result file created for request within the given time") + } + } + + t.Logf(" \U0001f600 [SUCCESS] : src=%v, dst=%v", srcfp, dstfp) + } + + return nil +} + func checkMetricValuesTest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error { mfs, err := stewardServer.metrics.promRegistry.Gather() if err != nil { @@ -467,11 +524,11 @@ func checkErrorKernelMalformedJSONtest(stewardServer *server, conf *Configuratio resultFile := filepath.Join(conf.SubscribersDataFolder, "errorLog", "errorCentral", "error.log") // Wait n times for error file to be created. - n := 5 + n := 50 for i := 0; i <= n; i++ { _, err := os.Stat(resultFile) if os.IsNotExist(err) { - time.Sleep(time.Millisecond * 1000) + time.Sleep(time.Millisecond * 100) continue } @@ -549,12 +606,12 @@ func checkFileUpdated(fileRealPath string, fileUpdated chan bool) { // Check if a file contains the given string. func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) { // Wait n seconds for the results file to be created - n := 5 + n := 50 for i := 0; i <= n; i++ { _, err := os.Stat(fileName) if os.IsNotExist(err) { - time.Sleep(time.Millisecond * 500) + time.Sleep(time.Millisecond * 100) continue } diff --git a/ringbuffer.go b/ringbuffer.go index 35b2a8c..aa82ce2 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -61,7 +61,6 @@ type ringBuffer struct { // newringBuffer returns a push/pop storage for values. func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, ringBufferBulkInCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer { - fmt.Printf(" * DEBUG: configuration: %+v\n", configuration) r := ringBuffer{} // Check if socket folder exists, if not create it @@ -77,7 +76,6 @@ func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configu // --- var db *bolt.DB - fmt.Printf(" * DEBUG: configuration: %+v\n", configuration) if configuration.RingBufferPersistStore { var err error db, err = bolt.Open(DatabaseFilepath, 0600, nil)