1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

fixed returns and error msgs for copy handlers

This commit is contained in:
postmannen 2022-06-21 07:45:36 +02:00
parent 2688f2ba2c
commit 8ec6372565

View file

@ -6,7 +6,6 @@ import (
"fmt"
"io"
"io/fs"
"log"
"os"
"path/filepath"
"sort"
@ -160,7 +159,8 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
fileInfo, err := os.Stat(SrcFilePath)
if err != nil {
// errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err)
log.Printf("error: copySrcSubProcFunc: failed to stat file: %v\n", err)
er := fmt.Errorf("error: copySrcSubProcFunc: failed to stat file: %v", err)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
return
}
@ -310,13 +310,16 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
func copySrcSubHandler(cia copyInitialData) func(process, Message, string) ([]byte, error) {
h := func(proc process, message Message, node string) ([]byte, error) {
// We should receive a ready message generated by the procFunc of Dst.
// We should receive a ready message generated by the procFunc of Dst,
// and any messages received we directly route into the procFunc.
select {
case <-proc.ctx.Done():
log.Printf(" * copySrcHandler ended: %v\n", proc.processName)
er := fmt.Errorf(" * copySrcHandler ended: %v", proc.processName)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
case proc.procFuncCh <- message:
log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName)
er := fmt.Errorf(" * copySrcHandler passing message over to procFunc: %v", proc.processName)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
}
return nil, nil
@ -330,9 +333,11 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by
select {
case <-proc.ctx.Done():
log.Printf(" * copyDstHandler ended: %v\n", proc.processName)
er := fmt.Errorf(" * copyDstHandler ended: %v", proc.processName)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
case proc.procFuncCh <- message:
log.Printf(" * copySrcHandler passing message over to procFunc: %v\n", proc.processName)
er := fmt.Errorf(" * copySrcHandler passing message over to procFunc: %v", proc.processName)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
}
return nil, nil
@ -378,10 +383,10 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
// Do action based on copyStatus received.
for {
fmt.Printf("\n * DEBUG: copySrcSubProcFunc: cia contains: %+v\n\n", cia)
select {
case <-ctx.Done():
log.Printf(" info: canceling copySrcProcFunc : %v\n", proc.processName)
er := fmt.Errorf(" info: canceling copySrcProcFunc : %v", proc.processName)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
return nil
// Pick up the message recived by the copySrcSubHandler.
@ -400,7 +405,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
// We set the default status to copyData. If we get an io.EOF we change it to copyDone later.
status := copyData
log.Printf(" * RECEIVED in copySrcSubProcFunc from dst * copyStatus=copyReady: %v\n\n", csa.CopyStatus)
b := make([]byte, cia.SplitChunkSize)
n, err := fh.Read(b)
if err != nil && err != io.EOF {
@ -414,13 +418,12 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
lastReadChunk = b[:n]
// Create a hash of the bytes
// Create a hash of the bytes.
hash := sha256.Sum256(b[:n])
chunkNumber++
// Create message and send data to dst
// fmt.Printf("**** DATA READ: %v\n", b)
// Create message and send data to dst.
csa := copySubData{
CopyStatus: status,
@ -437,7 +440,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
}
// We want to send a message back to src that we are ready to start.
// fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.DstNode,
FromNode: cia.SrcNode,
@ -447,8 +449,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
IsSubPublishedMsg: true,
}
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
@ -488,7 +488,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
chunkNumber++
// Create message and send data to dst
// fmt.Printf("**** DATA READ: %v\n", b)
csa := copySubData{
CopyStatus: status,
@ -505,7 +504,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
}
// We want to send a message back to src that we are ready to start.
// fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.DstNode,
FromNode: cia.SrcNode,
@ -515,8 +513,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
IsSubPublishedMsg: true,
}
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
@ -530,9 +526,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
case copyDstDone:
cancel()
return nil
default:
// TODO: Any error logic here ?
er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus)
proc.errorKernel.errSend(proc, message, er)
return er
@ -549,7 +545,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, cancel context.CancelFunc) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia)
csa := copySubData{
CopyStatus: copyReady,
@ -564,7 +559,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
// We want to send a message back to src that we are ready to start.
{
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
@ -574,8 +568,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
IsSubPublishedMsg: true,
}
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
@ -591,10 +583,10 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
os.Mkdir(tmpFolder, 0700)
for {
fmt.Printf("\n * DEBUG: copyDstSubProcFunc: cia contains: %+v\n\n", cia)
select {
case <-ctx.Done():
log.Printf(" * copyDstProcFunc ended: %v\n", proc.processName)
er := fmt.Errorf(" * copyDstProcFunc ended: %v", proc.processName)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
return nil
case message := <-procFuncCh:
var csa copySubData
@ -609,18 +601,14 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
// trigger the resend of the last message in the switch below.
hash := sha256.Sum256(csa.CopyData)
if hash != csa.Hash {
log.Printf("error: copyDstSubProcFunc: hash of received message is not correct for: %v\n", cia.DstMethod)
er := fmt.Errorf("error: copyDstSubProcFunc: hash of received message is not correct for: %v", cia.DstMethod)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
csa.CopyStatus = copyResendLast
}
// fmt.Printf(" * DEBUG: Hash was verified OK\n")
switch csa.CopyStatus {
case copyData:
// Write the data chunk to disk ?
// fmt.Printf("\n * Received data: %s\n\n", csa.CopyData)
err := func() error {
filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID)
fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
@ -653,9 +641,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
if err != nil {
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
return er
}
// fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
@ -665,8 +653,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
IsSubPublishedMsg: true,
}
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
@ -751,7 +737,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
fi.nr = nr
files = append(files, fi)
log.Printf("info: copy: appending path for chunk file=%v into=%v, size=%v\n", path, filePath, info.Size())
}
@ -760,12 +745,9 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
// Sort all the source nodes.
sort.SliceStable(files, func(i, j int) bool {
fmt.Printf("files[i].nr=%v < files[j].nr=%v\n", files[i].nr, files[j].nr)
return files[i].nr < files[j].nr
})
fmt.Printf(" * sorted slice: %v\n", files)
if err != nil {
er := fmt.Errorf("error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
@ -788,7 +770,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
return err
}
log.Printf("info: copy: writing content of split chunk file=%v into=%v\n", fp, filePath)
_, err = mainfh.Write(b[:n])
if err != nil {
return err
@ -800,7 +781,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
}()
if err != nil {
log.Printf("error: copyDstSubProcFunc: remove temp dir failed: %v\n", err)
er := fmt.Errorf("error: copyDstSubProcFunc: write to final destination file failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
}
// Remove the backup file, and tmp folder.
@ -812,7 +794,8 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
return er
}
log.Printf("info: copy: successfully wrote all split chunk files into file=%v\n", filePath)
er := fmt.Errorf("info: copy: successfully wrote all split chunk files into file=%v", filePath)
proc.errorKernel.logConsoleOnlyIfDebug(er, proc.configuration)
// Signal back to src that we are done, so it can cancel the process.
{
@ -828,7 +811,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
}
// We want to send a message back to src that we are ready to start.
// fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
@ -838,8 +820,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
IsSubPublishedMsg: true,
}
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)