mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-07 04:49:17 +00:00
fixed logging with copy handlers
This commit is contained in:
parent
efc5df5b16
commit
0d4bfd4471
1 changed files with 78 additions and 28 deletions
106
requests_copy.go
106
requests_copy.go
|
@ -370,9 +370,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
|
|
||||||
fh, err := os.Open(cia.SrcFilePath)
|
fh, err := os.Open(cia.SrcFilePath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// errCh <- fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v, %v", SrcFilePath, err)
|
er := fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v", err)
|
||||||
log.Fatalf("error: copySrcSubProcFunc: failed to open file: %v\n", err)
|
proc.errorKernel.errSend(proc, Message{}, er)
|
||||||
return nil
|
return er
|
||||||
}
|
}
|
||||||
defer fh.Close()
|
defer fh.Close()
|
||||||
|
|
||||||
|
@ -389,7 +389,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
var csa copySubData
|
var csa copySubData
|
||||||
err := cbor.Unmarshal(message.Data, &csa)
|
err := cbor.Unmarshal(message.Data, &csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
switch csa.CopyStatus {
|
switch csa.CopyStatus {
|
||||||
|
@ -401,7 +403,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
b := make([]byte, cia.SplitChunkSize)
|
b := make([]byte, cia.SplitChunkSize)
|
||||||
n, err := fh.Read(b)
|
n, err := fh.Read(b)
|
||||||
if err != nil && err != io.EOF {
|
if err != nil && err != io.EOF {
|
||||||
log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err)
|
er := fmt.Errorf("error: copySrcSubHandler: failed to read chunk from file: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
status = copySrcDone
|
status = copySrcDone
|
||||||
|
@ -426,7 +430,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
|
|
||||||
csaSerialized, err := cbor.Marshal(csa)
|
csaSerialized, err := cbor.Marshal(csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// We want to send a message back to src that we are ready to start.
|
// We want to send a message back to src that we are ready to start.
|
||||||
|
@ -444,7 +450,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(msg)
|
sam, err := newSubjectAndMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
@ -483,7 +491,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
|
|
||||||
csaSerialized, err := cbor.Marshal(csa)
|
csaSerialized, err := cbor.Marshal(csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// We want to send a message back to src that we are ready to start.
|
// We want to send a message back to src that we are ready to start.
|
||||||
|
@ -501,7 +511,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(msg)
|
sam, err := newSubjectAndMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
@ -513,7 +525,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// TODO: Any error logic here ?
|
// TODO: Any error logic here ?
|
||||||
log.Fatalf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n", csa.CopyStatus)
|
er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -535,7 +549,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
csaSerialized, err := cbor.Marshal(csa)
|
csaSerialized, err := cbor.Marshal(csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// We want to send a message back to src that we are ready to start.
|
// We want to send a message back to src that we are ready to start.
|
||||||
|
@ -554,7 +570,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(msg)
|
sam, err := newSubjectAndMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
@ -574,7 +592,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
var csa copySubData
|
var csa copySubData
|
||||||
err := cbor.Unmarshal(message.Data, &csa)
|
err := cbor.Unmarshal(message.Data, &csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if the hash matches. If it fails we set the status so we can
|
// Check if the hash matches. If it fails we set the status so we can
|
||||||
|
@ -593,20 +613,29 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
// Write the data chunk to disk ?
|
// Write the data chunk to disk ?
|
||||||
// fmt.Printf("\n * Received data: %s\n\n", csa.CopyData)
|
// fmt.Printf("\n * Received data: %s\n\n", csa.CopyData)
|
||||||
|
|
||||||
func() {
|
err := func() error {
|
||||||
filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID)
|
filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID)
|
||||||
fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: open file failed: %v", err)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
defer fh.Close()
|
defer fh.Close()
|
||||||
|
|
||||||
_, err = fh.Write(csa.CopyData)
|
_, err = fh.Write(csa.CopyData)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: open file failed: %v", err)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
proc.errorKernel.errSend(proc, message, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Prepare and send a ready message to src for the next chunk.
|
// Prepare and send a ready message to src for the next chunk.
|
||||||
csa := copySubData{
|
csa := copySubData{
|
||||||
CopyStatus: copyReady,
|
CopyStatus: copyReady,
|
||||||
|
@ -614,7 +643,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
csaSer, err := cbor.Marshal(csa)
|
csaSer, err := cbor.Marshal(csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
// fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
|
// fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
|
||||||
|
@ -631,7 +661,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(msg)
|
sam, err := newSubjectAndMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
@ -642,7 +674,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
// resend of the last message.
|
// resend of the last message.
|
||||||
csaSer, err := cbor.Marshal(csa)
|
csaSer, err := cbor.Marshal(csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
msg := Message{
|
msg := Message{
|
||||||
|
@ -656,13 +690,15 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(msg)
|
sam, err := newSubjectAndMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
case copySrcDone:
|
case copySrcDone:
|
||||||
func() {
|
err := func() error {
|
||||||
|
|
||||||
// Open the main file that chunks files will be written into.
|
// Open the main file that chunks files will be written into.
|
||||||
filePath := filepath.Join(cia.DstDir, cia.DstFile)
|
filePath := filepath.Join(cia.DstDir, cia.DstFile)
|
||||||
|
@ -673,7 +709,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, cia.FileMode)
|
mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, cia.FileMode)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: open file failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
defer mainfh.Close()
|
defer mainfh.Close()
|
||||||
|
|
||||||
|
@ -721,9 +759,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
fmt.Printf(" * sorted slice: %v\n", files)
|
fmt.Printf(" * sorted slice: %v\n", files)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
return
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
err = func() error {
|
err = func() error {
|
||||||
|
@ -761,7 +799,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
os.Remove(backupOriginalFileName)
|
os.Remove(backupOriginalFileName)
|
||||||
err = os.RemoveAll(tmpFolder)
|
err = os.RemoveAll(tmpFolder)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: remove temp dir failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: remove temp dir failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath)
|
log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath)
|
||||||
|
@ -774,7 +814,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
csaSerialized, err := cbor.Marshal(csa)
|
csaSerialized, err := cbor.Marshal(csa)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// We want to send a message back to src that we are ready to start.
|
// We want to send a message back to src that we are ready to start.
|
||||||
|
@ -792,14 +834,22 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
|
|
||||||
sam, err := newSubjectAndMessage(msg)
|
sam, err := newSubjectAndMessage(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
}
|
}
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
||||||
|
return nil
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue