1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

write reply messages for copysub procs back to src node

This commit is contained in:
postmannen 2022-11-23 09:18:02 +01:00
parent 198ba9503e
commit 70408d24e9

View file

@ -228,7 +228,7 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel)
copySrcSubProc.procFunc = copySrcSubProcFunc(copySrcSubProc, cia, cancel, message)
// assign a handler to the sub process
copySrcSubProc.handler = copySrcSubHandler(cia)
@ -404,9 +404,19 @@ type copySubData struct {
Hash [32]byte
}
func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc) func(context.Context, chan Message) error {
func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.CancelFunc, initialMessage Message) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
// We want to be able to send the reply message when the copying is done,
// and also for any eventual errors within the subProcFunc. We want to
// write these to the same place as the the reply message for the initial
// request, but we append .sub and .error to be able to write them to
// individual files.
msgForSubReplies := initialMessage
msgForSubErrors := initialMessage
msgForSubReplies.FileName = msgForSubReplies.FileName + ".copyreply"
msgForSubErrors.FileName = msgForSubErrors.FileName + ".copyerror"
var chunkNumber = 0
var lastReadChunk []byte
var resendRetries int
@ -417,6 +427,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
if err != nil {
er := fmt.Errorf("error: copySrcSubProcFunc: failed to open file: %v", err)
proc.errorKernel.errSend(proc, Message{}, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
defer fh.Close()
@ -436,6 +447,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
if err != nil {
er := fmt.Errorf("error: copySrcSubHandler: cbor unmarshal of csa failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
@ -450,6 +462,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
if err != nil && err != io.EOF {
er := fmt.Errorf("error: copySrcSubHandler: failed to read chunk from file: %v", err)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
if err == io.EOF {
@ -476,8 +489,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
csaSerialized, err := cbor.Marshal(csa)
if err != nil {
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
er := fmt.Errorf("error: copySrcSubProcFunc: cbor marshal of csa failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
@ -493,8 +507,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
sam, err := newSubjectAndMessage(msg)
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
er := fmt.Errorf("copySrcProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
@ -516,11 +531,11 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
if resendRetries > message.Retries {
er := fmt.Errorf("error: %v: failed to resend the chunk for the %v time, giving up", cia.DstMethod, resendRetries)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
// NB: Should we call cancel here, or wait for the timeout ?
proc.ctxCancel()
}
// HERE!
b := lastReadChunk
status := copyData
@ -542,6 +557,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
if err != nil {
er := fmt.Errorf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
@ -559,6 +575,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
if err != nil {
er := fmt.Errorf("copyDstProcSubFunc: newSubjectAndMessage failed: %v", err)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
@ -567,12 +584,15 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
resendRetries++
case copyDstDone:
newReplyMessage(proc, msgForSubReplies, []byte("copyDstDone"))
cancel()
return nil
default:
er := fmt.Errorf("error: copySrcSubProcFunc: not valid copyStatus, exiting: %v", csa.CopyStatus)
proc.errorKernel.errSend(proc, message, er)
newReplyMessage(proc, msgForSubErrors, []byte(er.Error()))
return er
}
}