mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
added test for file copy
This commit is contained in:
parent
bcc5eb35b6
commit
edccbea1b3
3 changed files with 69 additions and 12 deletions
|
@ -343,7 +343,8 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
|
||||||
// and update with new keys back.
|
// and update with new keys back.
|
||||||
|
|
||||||
proc.nodeAuth.publicKeys.mu.Lock()
|
proc.nodeAuth.publicKeys.mu.Lock()
|
||||||
fmt.Printf(" ----> publisher REQKeysRequestUpdate: sending our current hash: %v\n", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
|
er := fmt.Errorf(" ----> publisher REQKeysRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
m := Message{
|
m := Message{
|
||||||
FileName: "publickeysget.log",
|
FileName: "publickeysget.log",
|
||||||
|
@ -398,7 +399,8 @@ func (s startup) pubREQAclRequestUpdate(p process) {
|
||||||
// and update with new keys back.
|
// and update with new keys back.
|
||||||
|
|
||||||
proc.nodeAuth.nodeAcl.mu.Lock()
|
proc.nodeAuth.nodeAcl.mu.Lock()
|
||||||
fmt.Printf(" ----> publisher REQAclRequestUpdate: sending our current hash: %v\n", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
|
er := fmt.Errorf(" ----> publisher REQAclRequestUpdate: sending our current hash: %v", []byte(proc.nodeAuth.nodeAcl.aclAndHash.Hash[:]))
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
m := Message{
|
m := Message{
|
||||||
FileName: "aclRequestUpdate.log",
|
FileName: "aclRequestUpdate.log",
|
||||||
|
|
|
@ -336,6 +336,7 @@ func TestRequest(t *testing.T) {
|
||||||
checkREQTailFileTest(tstSrv, tstConf, t, tstTempDir)
|
checkREQTailFileTest(tstSrv, tstConf, t, tstTempDir)
|
||||||
checkMetricValuesTest(tstSrv, tstConf, t, tstTempDir)
|
checkMetricValuesTest(tstSrv, tstConf, t, tstTempDir)
|
||||||
checkErrorKernelMalformedJSONtest(tstSrv, tstConf, t, tstTempDir)
|
checkErrorKernelMalformedJSONtest(tstSrv, tstConf, t, tstTempDir)
|
||||||
|
checkREQCopySrc(tstSrv, tstConf, t, tstTempDir)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check the tailing of files type.
|
// Check the tailing of files type.
|
||||||
|
@ -386,16 +387,14 @@ func checkREQTailFileTest(stewardServer *server, conf *Configuration, t *testing
|
||||||
|
|
||||||
writeToSocketTest(conf, s, t)
|
writeToSocketTest(conf, s, t)
|
||||||
|
|
||||||
// time.Sleep(time.Second * 5)
|
|
||||||
|
|
||||||
resultFile := filepath.Join(conf.SubscribersDataFolder, "tail-files", "central", "fileName.result")
|
resultFile := filepath.Join(conf.SubscribersDataFolder, "tail-files", "central", "fileName.result")
|
||||||
|
|
||||||
// Wait n times for result file to be created.
|
// Wait n times for result file to be created.
|
||||||
n := 5
|
n := 50
|
||||||
for i := 0; i <= n; i++ {
|
for i := 0; i <= n; i++ {
|
||||||
_, err := os.Stat(resultFile)
|
_, err := os.Stat(resultFile)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
time.Sleep(time.Millisecond * 1000)
|
time.Sleep(time.Millisecond * 100)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -416,6 +415,64 @@ func checkREQTailFileTest(stewardServer *server, conf *Configuration, t *testing
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check the file copier.
|
||||||
|
func checkREQCopySrc(stewardServer *server, conf *Configuration, t *testing.T, tmpDir string) error {
|
||||||
|
testFiles := 5
|
||||||
|
|
||||||
|
for i := 1; i <= testFiles; i++ {
|
||||||
|
|
||||||
|
// Create a file with some content.
|
||||||
|
srcFileName := fmt.Sprintf("copysrc%v.file", i)
|
||||||
|
srcfp := filepath.Join(tmpDir, srcFileName)
|
||||||
|
fh, err := os.OpenFile(srcfp, os.O_APPEND|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf(" \U0001F631 [FAILED] : checkREQCopySrc: unable to open temporary file: %v", err)
|
||||||
|
}
|
||||||
|
defer fh.Close()
|
||||||
|
|
||||||
|
// Write content to the file.
|
||||||
|
|
||||||
|
_, err = fh.Write([]byte("some file content\n"))
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf(" \U0001F631 [FAILED] : checkREQCopySrc: writing to temporary file: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dstFileName := fmt.Sprintf("copydst%v.file", i)
|
||||||
|
dstfp := filepath.Join(tmpDir, dstFileName)
|
||||||
|
|
||||||
|
s := `[
|
||||||
|
{
|
||||||
|
"toNode": "central",
|
||||||
|
"method":"REQCopySrc",
|
||||||
|
"methodArgs": ["` + srcfp + `","central","` + dstfp + `","20","10"],
|
||||||
|
"ACKTimeout":5,
|
||||||
|
"retries":3,
|
||||||
|
"methodTimeout": 10
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
|
||||||
|
writeToSocketTest(conf, s, t)
|
||||||
|
|
||||||
|
// Wait n times for result file to be created.
|
||||||
|
n := 50
|
||||||
|
for i := 0; i <= n; i++ {
|
||||||
|
_, err := os.Stat(dstfp)
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
time.Sleep(time.Millisecond * 100)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if os.IsNotExist(err) && i >= n {
|
||||||
|
t.Fatalf(" \U0001F631 [FAILED] : checkREQCopySrc: no result file created for request within the given time")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf(" \U0001f600 [SUCCESS] : src=%v, dst=%v", srcfp, dstfp)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func checkMetricValuesTest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error {
|
func checkMetricValuesTest(stewardServer *server, conf *Configuration, t *testing.T, tempDir string) error {
|
||||||
mfs, err := stewardServer.metrics.promRegistry.Gather()
|
mfs, err := stewardServer.metrics.promRegistry.Gather()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -467,11 +524,11 @@ func checkErrorKernelMalformedJSONtest(stewardServer *server, conf *Configuratio
|
||||||
resultFile := filepath.Join(conf.SubscribersDataFolder, "errorLog", "errorCentral", "error.log")
|
resultFile := filepath.Join(conf.SubscribersDataFolder, "errorLog", "errorCentral", "error.log")
|
||||||
|
|
||||||
// Wait n times for error file to be created.
|
// Wait n times for error file to be created.
|
||||||
n := 5
|
n := 50
|
||||||
for i := 0; i <= n; i++ {
|
for i := 0; i <= n; i++ {
|
||||||
_, err := os.Stat(resultFile)
|
_, err := os.Stat(resultFile)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
time.Sleep(time.Millisecond * 1000)
|
time.Sleep(time.Millisecond * 100)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -549,12 +606,12 @@ func checkFileUpdated(fileRealPath string, fileUpdated chan bool) {
|
||||||
// Check if a file contains the given string.
|
// Check if a file contains the given string.
|
||||||
func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) {
|
func findStringInFileTest(want string, fileName string, conf *Configuration, t *testing.T) (bool, error) {
|
||||||
// Wait n seconds for the results file to be created
|
// Wait n seconds for the results file to be created
|
||||||
n := 5
|
n := 50
|
||||||
|
|
||||||
for i := 0; i <= n; i++ {
|
for i := 0; i <= n; i++ {
|
||||||
_, err := os.Stat(fileName)
|
_, err := os.Stat(fileName)
|
||||||
if os.IsNotExist(err) {
|
if os.IsNotExist(err) {
|
||||||
time.Sleep(time.Millisecond * 500)
|
time.Sleep(time.Millisecond * 100)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,7 +61,6 @@ type ringBuffer struct {
|
||||||
// newringBuffer returns a push/pop storage for values.
|
// newringBuffer returns a push/pop storage for values.
|
||||||
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, ringBufferBulkInCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer {
|
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, ringBufferBulkInCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer {
|
||||||
|
|
||||||
fmt.Printf(" * DEBUG: configuration: %+v\n", configuration)
|
|
||||||
r := ringBuffer{}
|
r := ringBuffer{}
|
||||||
|
|
||||||
// Check if socket folder exists, if not create it
|
// Check if socket folder exists, if not create it
|
||||||
|
@ -77,7 +76,6 @@ func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configu
|
||||||
|
|
||||||
// ---
|
// ---
|
||||||
var db *bolt.DB
|
var db *bolt.DB
|
||||||
fmt.Printf(" * DEBUG: configuration: %+v\n", configuration)
|
|
||||||
if configuration.RingBufferPersistStore {
|
if configuration.RingBufferPersistStore {
|
||||||
var err error
|
var err error
|
||||||
db, err = bolt.Open(DatabaseFilepath, 0600, nil)
|
db, err = bolt.Open(DatabaseFilepath, 0600, nil)
|
||||||
|
|
Loading…
Reference in a new issue