2022-06-09 03:29:41 +00:00
package steward
import (
"context"
2022-06-15 08:38:53 +00:00
"crypto/sha256"
2022-06-09 03:29:41 +00:00
"fmt"
2022-06-15 03:06:44 +00:00
"io"
2022-06-15 12:47:20 +00:00
"io/fs"
2022-10-14 08:12:50 +00:00
"log"
2022-06-14 12:32:35 +00:00
"os"
2022-06-09 09:25:59 +00:00
"path/filepath"
2022-06-16 10:18:21 +00:00
"sort"
2022-06-15 05:51:58 +00:00
"strconv"
2022-06-16 10:18:21 +00:00
"strings"
2022-06-16 05:12:03 +00:00
"time"
2022-06-09 03:29:41 +00:00
2022-06-09 09:25:59 +00:00
"github.com/fxamacker/cbor/v2"
2022-06-09 03:29:41 +00:00
"github.com/google/uuid"
)
2022-06-15 07:48:32 +00:00
type copyInitialData struct {
2022-06-16 05:12:03 +00:00
UUID string
SrcMethod Method
SrcNode Node
DstMethod Method
DstNode Node
SrcFilePath string
DstDir string
DstFile string
SplitChunkSize int
MaxTotalCopyTime int
FileMode fs . FileMode
2022-10-14 08:12:50 +00:00
FolderPermission uint64
2022-06-15 07:48:32 +00:00
}
2022-06-09 03:29:41 +00:00
type methodREQCopySrc struct {
event Event
}
func ( m methodREQCopySrc ) getKind ( ) Event {
return m . event
}
2022-06-15 03:31:46 +00:00
// methodREQCopySrc are handles the initial and first part of
// the message flow for a copy to destination request.
// It's main role is to start up a sub process for the destination
// in which all the actual file copying is done.
2022-06-09 03:29:41 +00:00
//
// Initialization, Source:
// - Use the REQCopySrc method to handle the initial request from the user.
// - Spawn a REQCopySrc_uid subscriber to receive sync messages from destination.
// - Send the uid, and full-file hash to the destination in a REQCopyDst message.
//
// Initialization, Destination:
// - Spawn a REQCopyDst-uid from the uid we got from source.
// --------------------------------------------------------------------------------------
//
// All below happens in the From-uid and To-uid methods until the copying is done.
//
// - dst->src, dst sends a REQCopySrc-uid message with status "ready" file receiving to src.
// - src receives the message and start reading the file:
// - src, creates hash of the complete file.
// - src, reads the file in chunks, and create a hash of each chunk.
// - src->dst, send chunk when read.
// - dst->src, wait for status "ready" indicating the chuck was transfered.
// - Loop and read new chunc.
2022-06-15 03:31:46 +00:00
// - src->dst, when last chunch is sent send status back that we are ready for the next message.
2022-06-09 03:29:41 +00:00
// - src->dst, if failure send status "error", abort file copying and clean up on both src and dst.
//
// - dst, read and store each chunch to tmp folder and verify hash.
// - dst->src, send status "ready" to src when chunch is stored.
2022-09-06 08:43:27 +00:00
// - loop and check for status "last", if last:
// - build original file from chuncs.
// - verify hash when file is built.
// - dst->src, send status "done".
2022-06-09 03:29:41 +00:00
//
// - We should also be be able to resend a chunk, or restart the copying from where we left of if it seems to hang.
//
2022-09-06 08:43:27 +00:00
// dataStructure{
// Data []bytes
// Status copyStatus
// id int
// }
2022-06-09 03:29:41 +00:00
//
// Create a new copy sync process to handle the actual file copying.
// We use the context already created based on the time out specified
// in the requestTimeout field of the message.
//
// -----------------------------------------------------
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func ( m methodREQCopySrc ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-09-06 08:43:27 +00:00
// If the toNode field is not the same as nodeName of the receiving node
// we should forward the message to that specified toNode. This will allow
// us to initiate a file copy from another node to this node.
if message . ToNode != proc . node {
sam , err := newSubjectAndMessage ( message )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage failed: %v, message=%v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
return nil , fmt . Errorf ( "info: the copy message was forwarded to %v message, %v" , message . ToNode , message )
}
2022-06-09 03:29:41 +00:00
var subProcessName string
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2022-06-15 07:48:32 +00:00
// Set default split chunk size, will be replaced with value from
// methodArgs[3] if defined.
2022-06-16 05:12:03 +00:00
splitChunkSize := 100000
// Set default max total copy time, will be replaced with value from
// methodArgs[4] if defined.
maxTotalCopyTime := message . MethodTimeout
2022-10-14 08:12:50 +00:00
// Default permission of destination folder if we need to create it.
// The value will be replaced
folderPermission := uint64 ( 0755 )
2022-06-15 07:48:32 +00:00
2022-10-14 08:12:50 +00:00
er := fmt . Errorf ( "info: before switch: FolderPermission defined in message for socket: %04o" , folderPermission )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-15 07:48:32 +00:00
// Verify and check the methodArgs
2022-10-14 08:12:50 +00:00
if len ( message . MethodArgs ) < 3 {
2022-06-09 03:29:41 +00:00
er := fmt . Errorf ( "error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
proc . errorKernel . errSend ( proc , message , er )
return
2022-10-14 08:12:50 +00:00
}
2022-06-15 07:48:32 +00:00
2022-10-14 08:12:50 +00:00
if len ( message . MethodArgs ) > 3 {
2022-06-16 21:00:27 +00:00
// Check if split chunk size was set, if not keep default.
2022-06-15 07:48:32 +00:00
var err error
splitChunkSize , err = strconv . Atoi ( message . MethodArgs [ 3 ] )
if err != nil {
2022-06-16 05:12:03 +00:00
er := fmt . Errorf ( "error: methodREQCopySrc: unble to convert splitChunkSize into int value: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
}
2022-10-14 08:12:50 +00:00
}
2022-06-16 05:12:03 +00:00
2022-10-14 08:12:50 +00:00
if len ( message . MethodArgs ) > 4 {
2022-06-16 21:00:27 +00:00
// Check if maxTotalCopyTime was set, if not keep default.
2022-06-16 05:12:03 +00:00
var err error
2022-06-16 21:00:27 +00:00
maxTotalCopyTime , err = strconv . Atoi ( message . MethodArgs [ 4 ] )
2022-06-16 05:12:03 +00:00
if err != nil {
er := fmt . Errorf ( "error: methodREQCopySrc: unble to convert maxTotalCopyTime into int value: %v" , err )
2022-06-15 07:48:32 +00:00
proc . errorKernel . errSend ( proc , message , er )
}
2022-06-09 03:29:41 +00:00
}
2022-10-14 08:12:50 +00:00
if len ( message . MethodArgs ) > 5 && message . MethodArgs [ 5 ] != "" {
// Check if file permissions were set, if not use default.
var err error
folderPermission , err = strconv . ParseUint ( message . MethodArgs [ 5 ] , 8 , 32 )
if err != nil {
log . Printf ( "%v\n" , err )
}
er := fmt . Errorf ( "info: FolderPermission defined in message for socket: %v, converted = %v" , message . MethodArgs [ 5 ] , folderPermission )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
if err != nil {
er := fmt . Errorf ( "error: methodREQCopySrc: unable to convert folderPermission into int value: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
}
}
2022-06-09 08:27:20 +00:00
SrcFilePath := message . MethodArgs [ 0 ]
DstNode := message . MethodArgs [ 1 ]
DstFilePath := message . MethodArgs [ 2 ]
2022-06-09 03:29:41 +00:00
2022-06-16 05:12:03 +00:00
// Create a child context to use with the procFunc with timeout set to the max allowed total copy time
// specified in the message.
var ctx context . Context
var cancel context . CancelFunc
func ( ) {
ctx , cancel = context . WithTimeout ( proc . ctx , time . Second * time . Duration ( maxTotalCopyTime ) )
} ( )
2022-06-09 03:29:41 +00:00
// Create a subject for one copy request
uid := uuid . New ( )
2022-06-14 05:09:20 +00:00
subProcessName = fmt . Sprintf ( "REQSUBCopySrc.%v" , uid . String ( ) )
2022-06-09 03:29:41 +00:00
2022-06-10 08:38:11 +00:00
dstDir := filepath . Dir ( DstFilePath )
dstFile := filepath . Base ( DstFilePath )
2022-06-14 05:05:38 +00:00
m := Method ( subProcessName )
2022-06-10 08:38:11 +00:00
2022-06-14 12:58:50 +00:00
// Also choosing to create the naming for the dst method here so
// we can have all the information in the cia from the beginning
// at both ends.
dstSubProcessName := fmt . Sprintf ( "REQSUBCopyDst.%v" , uid . String ( ) )
dstM := Method ( dstSubProcessName )
2022-06-15 12:47:20 +00:00
// Get the file permissions
fileInfo , err := os . Stat ( SrcFilePath )
if err != nil {
2022-06-16 21:32:44 +00:00
// errCh <- fmt.Errorf("error: methodREQCopySrc: failed to open file: %v, %v", SrcFilePath, err)
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( "error: copySrcSubProcFunc: failed to stat file: %v" , err )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-15 12:47:20 +00:00
return
}
fileMode := fileInfo . Mode ( )
2022-06-10 08:38:11 +00:00
cia := copyInitialData {
2022-06-16 05:12:03 +00:00
UUID : uid . String ( ) ,
SrcNode : proc . node ,
SrcMethod : m ,
DstNode : Node ( DstNode ) ,
DstMethod : dstM ,
SrcFilePath : SrcFilePath ,
DstDir : dstDir ,
DstFile : dstFile ,
SplitChunkSize : splitChunkSize ,
MaxTotalCopyTime : maxTotalCopyTime ,
FileMode : fileMode ,
2022-10-14 08:12:50 +00:00
FolderPermission : folderPermission ,
2022-06-10 08:38:11 +00:00
}
2022-06-09 03:29:41 +00:00
sub := newSubjectNoVerifyHandler ( m , node )
// Create a new sub process that will do the actual file copying.
2022-06-17 22:03:25 +00:00
copySrcSubProc := newSubProcess ( ctx , proc . server , sub , processKindSubscriber , nil )
2022-06-09 09:25:59 +00:00
2022-06-09 03:29:41 +00:00
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
2022-11-23 08:18:02 +00:00
copySrcSubProc . procFunc = copySrcSubProcFunc ( copySrcSubProc , cia , cancel , message )
2022-06-09 09:25:59 +00:00
2022-06-11 04:30:58 +00:00
// assign a handler to the sub process
2022-06-14 05:05:38 +00:00
copySrcSubProc . handler = copySrcSubHandler ( cia )
2022-06-11 04:30:58 +00:00
2022-06-09 03:29:41 +00:00
// The process will be killed when the context expires.
go copySrcSubProc . spawnWorker ( )
2022-06-09 09:25:59 +00:00
// Send a message over the the node where the destination file will be written,
// to also start up a sub process on the destination node.
2022-10-14 08:12:50 +00:00
// Marshal the data payload to send to the dst.
2022-06-09 09:25:59 +00:00
cb , err := cbor . Marshal ( cia )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
cancel ( )
}
msg := message
msg . ToNode = Node ( DstNode )
//msg.Method = REQToFile
msg . Method = REQCopyDst
msg . Data = cb
2022-12-21 08:27:52 +00:00
msg . ACKTimeout = message . ACKTimeout
msg . Retries = message . Retries
msg . ReplyACKTimeout = message . ReplyACKTimeout
msg . ReplyRetries = message . ReplyRetries
2022-06-09 09:40:53 +00:00
// msg.Directory = dstDir
// msg.FileName = dstFile
2022-06-09 09:25:59 +00:00
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-09-06 08:43:27 +00:00
er := fmt . Errorf ( "error: newSubjectAndMessage failed: %v, message=%v" , err , message )
2022-06-09 09:25:59 +00:00
proc . errorKernel . errSend ( proc , message , er )
cancel ( )
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2022-10-14 08:22:51 +00:00
replyData := fmt . Sprintf ( "info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying" , copySrcSubProc . processName , node , SrcFilePath , DstNode , DstFilePath , subProcessName )
2022-06-09 08:27:20 +00:00
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
2022-06-09 03:29:41 +00:00
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-06-17 22:03:25 +00:00
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
func newSubProcess ( ctx context . Context , server * server , subject Subject , processKind processKind , procFunc func ( ) error ) process {
p := newProcess ( ctx , server , subject , processKind , procFunc )
p . isSubProcess = true
return p
}
2022-06-09 03:29:41 +00:00
// ----
type methodREQCopyDst struct {
event Event
}
func ( m methodREQCopyDst ) getKind ( ) Event {
return m . event
}
2022-06-15 03:31:46 +00:00
// methodREQCopyDst are handles the initial and first part of
// the message flow for a copy to destination request.
// It's main role is to start up a sub process for the destination
// in which all the actual file copying is done.
2022-06-09 03:29:41 +00:00
func ( m methodREQCopyDst ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-06-09 09:40:53 +00:00
var subProcessName string
2022-06-09 03:29:41 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2022-06-15 03:31:46 +00:00
// Get the status message sent from source.
2022-06-09 09:25:59 +00:00
var cia copyInitialData
err := cbor . Unmarshal ( message . Data , & cia )
if err != nil {
er := fmt . Errorf ( "error: methodREQCopyDst: failed to cbor Unmarshal data: %v, message=%v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
return
}
2022-06-16 05:12:03 +00:00
// Create a child context to use with the procFunc with timeout set to the max allowed total copy time
// specified in the message.
var ctx context . Context
var cancel context . CancelFunc
func ( ) {
ctx , cancel = context . WithTimeout ( proc . ctx , time . Second * time . Duration ( cia . MaxTotalCopyTime ) )
} ( )
2022-06-09 09:40:53 +00:00
// Create a subject for one copy request
2022-06-14 12:58:50 +00:00
sub := newSubjectNoVerifyHandler ( cia . DstMethod , node )
2022-06-09 09:40:53 +00:00
2022-12-19 06:21:53 +00:00
// Check if we already got a sub process registered and started with
// the processName. If true, return here and don't start up another
// process for that file.
//
// NB: This check is put in here if a message for some reason are
// received more than once. The reason that this might happen can be
// that a message for the same copy request was received earlier, but
// was unable to start up within the timeout specified. The Sender of
// that request will then resend the message, but at the time that
// second message is received the subscriber process started for the
// previous message is then fully up and running, so we just discard
// that second message in those cases.
2022-12-26 09:12:45 +00:00
pn := processNameGet ( sub . name ( ) , processKindSubscriber )
// fmt.Printf("\n\n *** DEBUG: processNameGet: %v\n\n", pn)
2022-12-19 06:21:53 +00:00
proc . processes . active . mu . Lock ( )
2022-12-26 09:12:45 +00:00
_ , ok := proc . processes . active . procNames [ pn ]
2022-12-19 06:21:53 +00:00
proc . processes . active . mu . Unlock ( )
if ok {
2022-12-26 09:12:45 +00:00
log . Printf ( " * * * DEBUG: subprocesses already existed, will not start another subscriber for %v\n" , pn )
// HERE!!!
// If the process name already existed we return here before any
// new information is registered in the process map and we avoid
// having to clean that up later.
2022-12-19 06:21:53 +00:00
return
}
2022-12-26 09:12:45 +00:00
// Create a new sub process that will do the actual file copying.
copyDstSubProc := newSubProcess ( ctx , proc . server , sub , processKindSubscriber , nil )
2022-06-09 09:40:53 +00:00
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
2022-06-16 04:42:34 +00:00
copyDstSubProc . procFunc = copyDstSubProcFunc ( copyDstSubProc , cia , message , cancel )
2022-06-09 09:40:53 +00:00
2022-06-11 04:30:58 +00:00
// assign a handler to the sub process
2022-06-14 05:05:38 +00:00
copyDstSubProc . handler = copyDstSubHandler ( cia )
2022-06-11 04:30:58 +00:00
2022-06-09 09:40:53 +00:00
// The process will be killed when the context expires.
go copyDstSubProc . spawnWorker ( )
2022-06-09 09:25:59 +00:00
2022-06-10 04:17:11 +00:00
fp := filepath . Join ( cia . DstDir , cia . DstFile )
2022-10-14 08:22:51 +00:00
replyData := fmt . Sprintf ( "info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying" , copyDstSubProc . processName , node , fp , subProcessName )
2022-06-10 04:17:11 +00:00
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
2022-06-09 03:29:41 +00:00
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-06-09 09:40:53 +00:00
2022-06-14 05:05:38 +00:00
func copySrcSubHandler ( cia copyInitialData ) func ( process , Message , string ) ( [ ] byte , error ) {
2022-06-11 04:30:58 +00:00
h := func ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-06-14 08:17:09 +00:00
2022-06-21 05:45:36 +00:00
// We should receive a ready message generated by the procFunc of Dst,
// and any messages received we directly route into the procFunc.
2022-06-14 05:05:38 +00:00
2022-06-11 04:30:58 +00:00
select {
case <- proc . ctx . Done ( ) :
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( " * copySrcHandler ended: %v" , proc . processName )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-14 09:31:19 +00:00
case proc . procFuncCh <- message :
2022-12-21 07:16:45 +00:00
er := fmt . Errorf ( "copySrcHandler: passing message over to procFunc: %v" , proc . processName )
2022-06-21 05:45:36 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-11 04:30:58 +00:00
}
return nil , nil
}
return h
}
2022-06-14 05:05:38 +00:00
func copyDstSubHandler ( cia copyInitialData ) func ( process , Message , string ) ( [ ] byte , error ) {
2022-06-11 04:30:58 +00:00
h := func ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-06-14 05:05:38 +00:00
2022-06-11 04:30:58 +00:00
select {
case <- proc . ctx . Done ( ) :
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( " * copyDstHandler ended: %v" , proc . processName )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-14 12:32:35 +00:00
case proc . procFuncCh <- message :
2022-12-21 07:16:45 +00:00
er := fmt . Errorf ( "copyDstHandler: passing message over to procFunc: %v" , proc . processName )
2022-06-21 05:45:36 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-11 04:30:58 +00:00
}
return nil , nil
}
return h
}
2022-06-15 05:51:58 +00:00
type copyStatus int
const (
2022-06-15 12:47:20 +00:00
copyReady copyStatus = 1
copyData copyStatus = 2
2022-06-16 04:42:34 +00:00
copySrcDone copyStatus = 3
2022-06-15 12:47:20 +00:00
copyResendLast copyStatus = 4
2022-06-16 04:42:34 +00:00
copyDstDone copyStatus = 5
2022-06-15 05:51:58 +00:00
)
// copySubData is the structure being used between the src and dst while copying data.
type copySubData struct {
CopyStatus copyStatus
CopyData [ ] byte
ChunkNumber int
2022-06-15 08:38:53 +00:00
Hash [ 32 ] byte
2022-06-15 05:51:58 +00:00
}
2022-11-23 08:18:02 +00:00
func copySrcSubProcFunc ( proc process , cia copyInitialData , cancel context . CancelFunc , initialMessage Message ) func ( context . Context , chan Message ) error {
2022-06-10 04:17:11 +00:00
pf := func ( ctx context . Context , procFuncCh chan Message ) error {
2022-11-23 08:18:02 +00:00
// We want to be able to send the reply message when the copying is done,
// and also for any eventual errors within the subProcFunc. We want to
// write these to the same place as the the reply message for the initial
// request, but we append .sub and .error to be able to write them to
// individual files.
msgForSubReplies := initialMessage
msgForSubErrors := initialMessage
msgForSubReplies . FileName = msgForSubReplies . FileName + ".copyreply"
msgForSubErrors . FileName = msgForSubErrors . FileName + ".copyerror"
2022-06-15 05:51:58 +00:00
var chunkNumber = 0
2022-06-15 12:47:20 +00:00
var lastReadChunk [ ] byte
var resendRetries int
2022-06-14 12:32:35 +00:00
// Initiate the file copier.
2022-06-15 12:47:20 +00:00
2022-06-14 12:32:35 +00:00
fh , err := os . Open ( cia . SrcFilePath )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copySrcSubProcFunc: failed to open file: %v" , err )
proc . errorKernel . errSend ( proc , Message { } , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-17 22:43:42 +00:00
return er
2022-06-14 12:32:35 +00:00
}
defer fh . Close ( )
// Do action based on copyStatus received.
2022-06-14 09:31:19 +00:00
for {
select {
case <- ctx . Done ( ) :
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( " info: canceling copySrcProcFunc : %v" , proc . processName )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-14 09:31:19 +00:00
return nil
// Pick up the message recived by the copySrcSubHandler.
case message := <- procFuncCh :
var csa copySubData
err := cbor . Unmarshal ( message . Data , & csa )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copySrcSubHandler: cbor unmarshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-17 22:43:42 +00:00
return er
2022-06-14 09:31:19 +00:00
}
switch csa . CopyStatus {
case copyReady :
2022-06-18 05:35:59 +00:00
err := func ( ) error {
// We set the default status to copyData. If we get an io.EOF we change it to copyDone later.
status := copyData
b := make ( [ ] byte , cia . SplitChunkSize )
n , err := fh . Read ( b )
if err != nil && err != io . EOF {
er := fmt . Errorf ( "error: copySrcSubHandler: failed to read chunk from file: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-18 05:35:59 +00:00
return er
}
if err == io . EOF {
status = copySrcDone
}
2022-06-22 03:03:11 +00:00
lastReadChunk = make ( [ ] byte , len ( b [ : n ] ) )
copy ( lastReadChunk , b [ : n ] )
//lastReadChunk = b[:n]
2022-06-18 05:35:59 +00:00
2022-06-21 05:45:36 +00:00
// Create a hash of the bytes.
2022-06-18 05:35:59 +00:00
hash := sha256 . Sum256 ( b [ : n ] )
chunkNumber ++
2022-06-21 05:45:36 +00:00
// Create message and send data to dst.
2022-06-18 05:35:59 +00:00
csa := copySubData {
CopyStatus : status ,
CopyData : b [ : n ] ,
ChunkNumber : chunkNumber ,
Hash : hash ,
}
csaSerialized , err := cbor . Marshal ( csa )
if err != nil {
2022-11-23 08:18:02 +00:00
er := fmt . Errorf ( "error: copySrcSubProcFunc: cbor marshal of csa failed: %v" , err )
2022-06-18 05:35:59 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-18 05:35:59 +00:00
return er
}
// We want to send a message back to src that we are ready to start.
msg := Message {
ToNode : cia . DstNode ,
FromNode : cia . SrcNode ,
Method : cia . DstMethod ,
ReplyMethod : REQNone ,
Data : csaSerialized ,
IsSubPublishedMsg : true ,
2022-12-02 09:04:20 +00:00
ACKTimeout : initialMessage . ACKTimeout ,
Retries : initialMessage . Retries ,
2022-12-21 08:27:52 +00:00
ReplyACKTimeout : initialMessage . ReplyACKTimeout ,
ReplyRetries : initialMessage . ReplyRetries ,
2022-06-18 05:35:59 +00:00
}
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-11-23 08:18:02 +00:00
er := fmt . Errorf ( "copySrcProcSubFunc: newSubjectAndMessage failed: %v" , err )
2022-06-18 05:35:59 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-18 05:35:59 +00:00
return er
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
resendRetries = 0
// Testing with contect canceling here.
// proc.ctxCancel()
return nil
} ( )
2022-06-15 03:31:46 +00:00
if err != nil {
2022-06-18 05:35:59 +00:00
return err
2022-06-14 13:05:38 +00:00
}
2022-06-15 12:47:20 +00:00
case copyResendLast :
if resendRetries > message . Retries {
er := fmt . Errorf ( "error: %v: failed to resend the chunk for the %v time, giving up" , cia . DstMethod , resendRetries )
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-15 12:47:20 +00:00
// NB: Should we call cancel here, or wait for the timeout ?
proc . ctxCancel ( )
}
b := lastReadChunk
status := copyData
// Create a hash of the bytes
hash := sha256 . Sum256 ( b )
chunkNumber ++
// Create message and send data to dst
csa := copySubData {
CopyStatus : status ,
CopyData : b ,
ChunkNumber : chunkNumber ,
Hash : hash ,
}
csaSerialized , err := cbor . Marshal ( csa )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: cbor marshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-17 22:43:42 +00:00
return er
2022-06-15 12:47:20 +00:00
}
// We want to send a message back to src that we are ready to start.
msg := Message {
2022-06-17 22:03:25 +00:00
ToNode : cia . DstNode ,
FromNode : cia . SrcNode ,
Method : cia . DstMethod ,
ReplyMethod : REQNone ,
Data : csaSerialized ,
IsSubPublishedMsg : true ,
2022-12-02 09:04:20 +00:00
ACKTimeout : initialMessage . ACKTimeout ,
Retries : initialMessage . Retries ,
2022-12-21 08:27:52 +00:00
ReplyACKTimeout : initialMessage . ReplyACKTimeout ,
ReplyRetries : initialMessage . ReplyRetries ,
2022-06-15 12:47:20 +00:00
}
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "copyDstProcSubFunc: newSubjectAndMessage failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-17 22:43:42 +00:00
return er
2022-06-15 12:47:20 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
resendRetries ++
2022-06-16 04:42:34 +00:00
case copyDstDone :
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubReplies , [ ] byte ( "copyDstDone" ) )
2022-06-16 04:42:34 +00:00
cancel ( )
2022-06-21 05:45:36 +00:00
return nil
2022-06-16 04:42:34 +00:00
2022-06-14 09:31:19 +00:00
default :
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copySrcSubProcFunc: not valid copyStatus, exiting: %v" , csa . CopyStatus )
proc . errorKernel . errSend ( proc , message , er )
2022-11-23 08:18:02 +00:00
newReplyMessage ( proc , msgForSubErrors , [ ] byte ( er . Error ( ) ) )
2022-06-17 22:43:42 +00:00
return er
2022-06-14 09:31:19 +00:00
}
}
2022-06-10 04:17:11 +00:00
}
2022-06-14 09:31:19 +00:00
//return nil
2022-06-10 04:17:11 +00:00
}
return pf
}
2022-06-16 04:42:34 +00:00
func copyDstSubProcFunc ( proc process , cia copyInitialData , message Message , cancel context . CancelFunc ) func ( context . Context , chan Message ) error {
2022-06-15 05:51:58 +00:00
2022-06-09 09:40:53 +00:00
pf := func ( ctx context . Context , procFuncCh chan Message ) error {
2022-06-14 08:17:09 +00:00
csa := copySubData {
CopyStatus : copyReady ,
}
csaSerialized , err := cbor . Marshal ( csa )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: cbor marshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-14 08:17:09 +00:00
}
2022-06-09 09:40:53 +00:00
2022-06-14 05:05:38 +00:00
// We want to send a message back to src that we are ready to start.
2022-06-15 05:51:58 +00:00
{
msg := Message {
2022-06-17 22:03:25 +00:00
ToNode : cia . SrcNode ,
FromNode : cia . DstNode ,
Method : cia . SrcMethod ,
ReplyMethod : REQNone ,
Data : csaSerialized ,
IsSubPublishedMsg : true ,
2022-12-02 09:04:20 +00:00
ACKTimeout : message . ACKTimeout ,
Retries : message . Retries ,
2022-12-21 08:27:52 +00:00
ReplyACKTimeout : message . ReplyACKTimeout ,
ReplyRetries : message . ReplyRetries ,
2022-06-15 05:51:58 +00:00
}
2022-06-14 05:05:38 +00:00
2022-06-15 05:51:58 +00:00
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "copyDstProcSubFunc: newSubjectAndMessage failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-15 05:51:58 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2022-06-14 05:05:38 +00:00
}
2022-06-15 05:51:58 +00:00
// Open a tmp folder for where to write the received chunks
tmpFolder := filepath . Join ( proc . configuration . SocketFolder , cia . DstFile + "-" + cia . UUID )
2022-10-14 08:12:50 +00:00
err = os . Mkdir ( tmpFolder , 0700 )
if err != nil {
er := fmt . Errorf ( "copyDstProcSubFunc: create tmp folder for copying failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
}
2022-06-14 05:05:38 +00:00
2022-12-14 04:48:44 +00:00
defer func ( ) {
err = os . RemoveAll ( tmpFolder )
if err != nil {
er := fmt . Errorf ( "error: copyDstSubProcFunc: remove temp dir failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
}
} ( )
2022-06-14 12:32:35 +00:00
for {
select {
case <- ctx . Done ( ) :
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( " * copyDstProcFunc ended: %v" , proc . processName )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-14 12:32:35 +00:00
return nil
case message := <- procFuncCh :
var csa copySubData
err := cbor . Unmarshal ( message . Data , & csa )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copySrcSubHandler: cbor unmarshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-14 12:32:35 +00:00
}
2022-06-15 12:47:20 +00:00
// Check if the hash matches. If it fails we set the status so we can
// trigger the resend of the last message in the switch below.
2022-06-15 08:38:53 +00:00
hash := sha256 . Sum256 ( csa . CopyData )
if hash != csa . Hash {
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: hash of received message is not correct for: %v" , cia . DstMethod )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-15 12:47:20 +00:00
csa . CopyStatus = copyResendLast
2022-06-15 08:38:53 +00:00
}
2022-06-14 12:32:35 +00:00
switch csa . CopyStatus {
case copyData :
2022-06-17 22:43:42 +00:00
err := func ( ) error {
2022-06-15 05:51:58 +00:00
filePath := filepath . Join ( tmpFolder , strconv . Itoa ( csa . ChunkNumber ) + "." + cia . UUID )
fh , err := os . OpenFile ( filePath , os . O_TRUNC | os . O_RDWR | os . O_CREATE | os . O_SYNC , 0600 )
if err != nil {
2022-10-14 08:12:50 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: open destination chunk file for writing failed: %v" , err )
2022-06-17 22:43:42 +00:00
return er
2022-06-15 05:51:58 +00:00
}
defer fh . Close ( )
_ , err = fh . Write ( csa . CopyData )
if err != nil {
2022-10-14 08:12:50 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: writing to chunk file failed: %v" , err )
2022-06-17 22:43:42 +00:00
return er
2022-06-15 05:51:58 +00:00
}
2022-06-17 22:43:42 +00:00
return nil
2022-06-15 05:51:58 +00:00
} ( )
2022-06-17 22:43:42 +00:00
if err != nil {
proc . errorKernel . errSend ( proc , message , err )
return err
}
2022-06-15 05:51:58 +00:00
// Prepare and send a ready message to src for the next chunk.
2022-06-15 03:06:44 +00:00
csa := copySubData {
CopyStatus : copyReady ,
}
2022-06-15 12:47:20 +00:00
csaSer , err := cbor . Marshal ( csa )
2022-06-15 03:06:44 +00:00
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: cbor marshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-06-21 05:45:36 +00:00
return er
2022-06-15 03:06:44 +00:00
}
msg := Message {
2022-06-17 22:03:25 +00:00
ToNode : cia . SrcNode ,
FromNode : cia . DstNode ,
Method : cia . SrcMethod ,
ReplyMethod : REQNone ,
Data : csaSer ,
IsSubPublishedMsg : true ,
2022-12-02 09:04:20 +00:00
ACKTimeout : message . ACKTimeout ,
Retries : message . Retries ,
2022-12-21 08:27:52 +00:00
ReplyACKTimeout : message . ReplyACKTimeout ,
ReplyRetries : message . ReplyRetries ,
2022-06-15 12:47:20 +00:00
}
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "copyDstProcSubFunc: newSubjectAndMessage failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-15 12:47:20 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
case copyResendLast :
// The csa already contains copyStatus copyResendLast when reached here,
// so we can just serialize csa, and send a message back to sourcde for
// resend of the last message.
csaSer , err := cbor . Marshal ( csa )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: cbor marshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-15 12:47:20 +00:00
}
msg := Message {
2022-06-17 22:03:25 +00:00
ToNode : cia . SrcNode ,
FromNode : cia . DstNode ,
Method : cia . SrcMethod ,
ReplyMethod : REQNone ,
Data : csaSer ,
IsSubPublishedMsg : true ,
2022-12-02 09:04:20 +00:00
ACKTimeout : message . ACKTimeout ,
Retries : message . Retries ,
2022-12-21 08:27:52 +00:00
ReplyACKTimeout : message . ReplyACKTimeout ,
ReplyRetries : message . ReplyRetries ,
2022-06-15 03:06:44 +00:00
}
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "copyDstProcSubFunc: newSubjectAndMessage failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-15 03:06:44 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2022-06-16 04:42:34 +00:00
case copySrcDone :
2022-06-17 22:43:42 +00:00
err := func ( ) error {
2022-06-15 12:47:20 +00:00
2022-06-15 05:51:58 +00:00
// Open the main file that chunks files will be written into.
filePath := filepath . Join ( cia . DstDir , cia . DstFile )
2022-06-15 12:47:20 +00:00
2022-10-14 08:12:50 +00:00
// HERE:
er := fmt . Errorf ( "info: Before creating folder: cia.FolderPermission: %04o" , cia . FolderPermission )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
if _ , err := os . Stat ( cia . DstDir ) ; os . IsNotExist ( err ) {
// TODO: Add option to set permission here ???
err := os . MkdirAll ( cia . DstDir , fs . FileMode ( cia . FolderPermission ) )
if err != nil {
return fmt . Errorf ( "error: failed to create destination directory for file copying %v: %v" , cia . DstDir , err )
}
er := fmt . Errorf ( "info: Created folder: with cia.FolderPermission: %04o" , cia . FolderPermission )
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
}
2022-06-15 12:47:20 +00:00
// Rename the file so we got a backup.
backupOriginalFileName := filePath + ".bck"
os . Rename ( filePath , backupOriginalFileName )
mainfh , err := os . OpenFile ( filePath , os . O_TRUNC | os . O_RDWR | os . O_CREATE | os . O_SYNC , cia . FileMode )
2022-06-15 05:51:58 +00:00
if err != nil {
2022-10-14 08:12:50 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: open final destination file failed: %v" , err )
2022-06-17 22:43:42 +00:00
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-15 05:51:58 +00:00
}
defer mainfh . Close ( )
2022-06-16 10:18:21 +00:00
type fInfo struct {
name string
dir string
nr int
}
files := [ ] fInfo { }
2022-06-15 05:51:58 +00:00
// Walk the tmp transfer directory and combine all the chunks into one file.
err = filepath . Walk ( tmpFolder , func ( path string , info os . FileInfo , err error ) error {
if err != nil {
return err
}
if ! info . IsDir ( ) {
2022-06-16 10:18:21 +00:00
fi := fInfo { }
fi . name = filepath . Base ( path )
fi . dir = filepath . Dir ( path )
sp := strings . Split ( fi . name , "." )
nr , err := strconv . Atoi ( sp [ 0 ] )
if err != nil {
return err
}
fi . nr = nr
files = append ( files , fi )
}
return nil
} )
// Sort all the source nodes.
sort . SliceStable ( files , func ( i , j int ) bool {
return files [ i ] . nr < files [ j ] . nr
} )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: creation of slice of chunk paths failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-16 10:18:21 +00:00
}
err = func ( ) error {
for _ , fInfo := range files {
fp := filepath . Join ( fInfo . dir , fInfo . name )
fh , err := os . Open ( fp )
2022-06-15 05:51:58 +00:00
if err != nil {
return err
}
defer fh . Close ( )
2022-06-15 07:48:32 +00:00
b := make ( [ ] byte , cia . SplitChunkSize )
2022-06-15 18:55:20 +00:00
n , err := fh . Read ( b )
2022-06-15 05:51:58 +00:00
if err != nil {
return err
}
2022-06-16 04:15:18 +00:00
_ , err = mainfh . Write ( b [ : n ] )
2022-06-15 05:51:58 +00:00
if err != nil {
return err
}
}
return nil
2022-06-16 10:18:21 +00:00
} ( )
2022-06-15 12:47:20 +00:00
2022-06-16 10:18:21 +00:00
if err != nil {
2022-06-21 05:45:36 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: write to final destination file failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
2022-06-15 05:51:58 +00:00
}
2022-12-14 04:48:44 +00:00
// Remove the backup file.
err = os . Remove ( backupOriginalFileName )
2022-12-31 05:02:26 +00:00
if err != nil && ! os . IsNotExist ( err ) {
2022-12-14 04:48:44 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: remove of backup of original file failed: %v" , err )
2022-06-17 22:43:42 +00:00
proc . errorKernel . errSend ( proc , message , er )
2022-06-15 18:12:48 +00:00
}
2022-06-15 12:47:20 +00:00
2022-10-14 08:12:50 +00:00
er = fmt . Errorf ( "info: copy: successfully wrote all split chunk files into file=%v" , filePath )
2022-06-21 05:45:36 +00:00
proc . errorKernel . logConsoleOnlyIfDebug ( er , proc . configuration )
2022-06-16 04:15:18 +00:00
2022-06-16 04:42:34 +00:00
// Signal back to src that we are done, so it can cancel the process.
{
csa := copySubData {
CopyStatus : copyDstDone ,
}
csaSerialized , err := cbor . Marshal ( csa )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "error: copyDstSubProcFunc: cbor marshal of csa failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-16 04:42:34 +00:00
}
// We want to send a message back to src that we are ready to start.
msg := Message {
2022-06-17 22:03:25 +00:00
ToNode : cia . SrcNode ,
FromNode : cia . DstNode ,
Method : cia . SrcMethod ,
ReplyMethod : REQNone ,
Data : csaSerialized ,
IsSubPublishedMsg : true ,
2022-12-02 09:04:20 +00:00
ACKTimeout : message . ACKTimeout ,
Retries : message . Retries ,
2022-12-21 08:27:52 +00:00
ReplyACKTimeout : message . ReplyACKTimeout ,
ReplyRetries : message . ReplyRetries ,
2022-06-16 04:42:34 +00:00
}
sam , err := newSubjectAndMessage ( msg )
if err != nil {
2022-06-17 22:43:42 +00:00
er := fmt . Errorf ( "copyDstProcSubFunc: newSubjectAndMessage failed: %v" , err )
proc . errorKernel . errSend ( proc , message , er )
return er
2022-06-16 04:42:34 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
}
cancel ( )
2022-06-17 22:43:42 +00:00
return nil
2022-06-15 05:51:58 +00:00
} ( )
2022-06-17 22:43:42 +00:00
if err != nil {
return err
}
2022-06-14 12:32:35 +00:00
}
}
2022-06-09 09:40:53 +00:00
}
}
return pf
}