2022-06-09 03:29:41 +00:00
package steward
import (
"context"
"fmt"
2022-06-09 06:42:49 +00:00
"log"
2022-06-14 12:32:35 +00:00
"os"
2022-06-09 09:25:59 +00:00
"path/filepath"
2022-06-09 03:29:41 +00:00
2022-06-09 09:25:59 +00:00
"github.com/fxamacker/cbor/v2"
2022-06-09 03:29:41 +00:00
"github.com/google/uuid"
)
type methodREQCopySrc struct {
event Event
}
func ( m methodREQCopySrc ) getKind ( ) Event {
return m . event
}
// Idea:
//
// Initialization, Source:
// - Use the REQCopySrc method to handle the initial request from the user.
// - Spawn a REQCopySrc_uid subscriber to receive sync messages from destination.
// - Send the uid, and full-file hash to the destination in a REQCopyDst message.
//
// Initialization, Destination:
// - Spawn a REQCopyDst-uid from the uid we got from source.
// --------------------------------------------------------------------------------------
//
// All below happens in the From-uid and To-uid methods until the copying is done.
//
// - dst->src, dst sends a REQCopySrc-uid message with status "ready" file receiving to src.
// - src receives the message and start reading the file:
// - src, creates hash of the complete file.
// - src, reads the file in chunks, and create a hash of each chunk.
// - src->dst, send chunk when read.
// - dst->src, wait for status "ready" indicating the chuck was transfered.
// - Loop and read new chunc.
// - src->dst, when last chunch is sent send status "last"
// - src->dst, if failure send status "error", abort file copying and clean up on both src and dst.
//
// - dst, read and store each chunch to tmp folder and verify hash.
// - dst->src, send status "ready" to src when chunch is stored.
// - loop and check for status "last", if last:
// - build original file from chuncs.
// - verify hash when file is built.
// - dst->src, send status "done".
//
// - We should also be be able to resend a chunk, or restart the copying from where we left of if it seems to hang.
//
// dataStructure{
// Data []bytes
// Status copyStatus
// id int
// }
//
// Create a new copy sync process to handle the actual file copying.
// We use the context already created based on the time out specified
// in the requestTimeout field of the message.
//
// -----------------------------------------------------
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
func ( m methodREQCopySrc ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
var subProcessName string
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
switch {
case len ( message . MethodArgs ) < 3 :
er := fmt . Errorf ( "error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath" )
proc . errorKernel . errSend ( proc , message , er )
return
}
2022-06-09 08:27:20 +00:00
SrcFilePath := message . MethodArgs [ 0 ]
DstNode := message . MethodArgs [ 1 ]
DstFilePath := message . MethodArgs [ 2 ]
2022-06-09 03:29:41 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
2022-06-09 03:59:37 +00:00
// Since the subProc spawned will outlive this method here we do not
// want to cancel this method. We care about the methodTimeout, but
// we ignore the CancelFunc.
2022-06-09 09:25:59 +00:00
ctx , cancel := getContextForMethodTimeout ( proc . ctx , message )
2022-06-09 03:29:41 +00:00
// Create a subject for one copy request
uid := uuid . New ( )
2022-06-14 05:09:20 +00:00
subProcessName = fmt . Sprintf ( "REQSUBCopySrc.%v" , uid . String ( ) )
2022-06-09 03:29:41 +00:00
2022-06-10 08:38:11 +00:00
dstDir := filepath . Dir ( DstFilePath )
dstFile := filepath . Base ( DstFilePath )
2022-06-14 05:05:38 +00:00
m := Method ( subProcessName )
2022-06-10 08:38:11 +00:00
2022-06-14 12:58:50 +00:00
// Also choosing to create the naming for the dst method here so
// we can have all the information in the cia from the beginning
// at both ends.
dstSubProcessName := fmt . Sprintf ( "REQSUBCopyDst.%v" , uid . String ( ) )
dstM := Method ( dstSubProcessName )
2022-06-10 08:38:11 +00:00
cia := copyInitialData {
2022-06-14 12:32:35 +00:00
UUID : uid . String ( ) ,
2022-06-14 12:58:50 +00:00
SrcNode : proc . node ,
2022-06-14 12:32:35 +00:00
SrcMethod : m ,
2022-06-14 12:58:50 +00:00
DstNode : Node ( DstNode ) ,
DstMethod : dstM ,
2022-06-14 12:32:35 +00:00
SrcFilePath : SrcFilePath ,
DstDir : dstDir ,
DstFile : dstFile ,
2022-06-10 08:38:11 +00:00
}
2022-06-09 03:29:41 +00:00
sub := newSubjectNoVerifyHandler ( m , node )
// Create a new sub process that will do the actual file copying.
copySrcSubProc := newProcess ( ctx , proc . server , sub , processKindSubscriber , nil )
2022-06-09 09:25:59 +00:00
2022-06-09 03:29:41 +00:00
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
2022-06-14 05:05:38 +00:00
copySrcSubProc . procFunc = copySrcSubProcFunc ( copySrcSubProc , cia )
2022-06-09 09:25:59 +00:00
2022-06-11 04:30:58 +00:00
// assign a handler to the sub process
2022-06-14 05:05:38 +00:00
copySrcSubProc . handler = copySrcSubHandler ( cia )
2022-06-11 04:30:58 +00:00
2022-06-09 03:29:41 +00:00
// The process will be killed when the context expires.
go copySrcSubProc . spawnWorker ( )
2022-06-09 09:25:59 +00:00
// Send a message over the the node where the destination file will be written,
// to also start up a sub process on the destination node.
// Marshal the data payload to send the the dst.
cb , err := cbor . Marshal ( cia )
if err != nil {
er := fmt . Errorf ( "error: newSubjectAndMessage : %v, message: %v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
cancel ( )
}
msg := message
msg . ToNode = Node ( DstNode )
//msg.Method = REQToFile
msg . Method = REQCopyDst
msg . Data = cb
2022-06-09 09:40:53 +00:00
// msg.Directory = dstDir
// msg.FileName = dstFile
2022-06-09 09:25:59 +00:00
sam , err := newSubjectAndMessage ( msg )
if err != nil {
er := fmt . Errorf ( "error: methodREQCopySrc failed to cbor Marshal data: %v, message=%v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
cancel ( )
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2022-06-10 04:17:11 +00:00
replyData := fmt . Sprintf ( "info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n" , copySrcSubProc . processName , node , SrcFilePath , DstNode , DstFilePath , subProcessName )
2022-06-09 08:27:20 +00:00
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
2022-06-09 03:29:41 +00:00
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-06-09 09:25:59 +00:00
type copyInitialData struct {
2022-06-14 12:32:35 +00:00
UUID string
SrcMethod Method
SrcNode Node
DstMethod Method
DstNode Node
SrcFilePath string
DstDir string
DstFile string
2022-06-09 09:25:59 +00:00
}
2022-06-09 03:29:41 +00:00
// ----
type methodREQCopyDst struct {
event Event
}
func ( m methodREQCopyDst ) getKind ( ) Event {
return m . event
}
// Handle writing to a file. Will truncate any existing data if the file did already
// exist.
// Same as the REQToFile, but this requst type don't use the default data folder path
// for where to store files or add information about node names.
// This method also sends a msgReply back to the publisher if the method was done
// successfully, where REQToFile do not.
// This method will truncate and overwrite any existing files.
func ( m methodREQCopyDst ) handler ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-06-09 09:40:53 +00:00
var subProcessName string
2022-06-09 03:29:41 +00:00
proc . processes . wg . Add ( 1 )
go func ( ) {
defer proc . processes . wg . Done ( )
2022-06-09 09:25:59 +00:00
var cia copyInitialData
err := cbor . Unmarshal ( message . Data , & cia )
if err != nil {
er := fmt . Errorf ( "error: methodREQCopyDst: failed to cbor Unmarshal data: %v, message=%v" , err , message )
proc . errorKernel . errSend ( proc , message , er )
return
}
2022-06-09 09:40:53 +00:00
// Get a context with the timeout specified in message.MethodTimeout.
// Since the subProc spawned will outlive this method here we do not
// want to cancel this method. We care about the methodTimeout, but
// we ignore the CancelFunc.
ctx , _ := getContextForMethodTimeout ( proc . ctx , message )
// Create a subject for one copy request
2022-06-14 12:58:50 +00:00
// subProcessName = fmt.Sprintf("REQSUBCopyDst.%v", cia.UUID)
// m := Method(subProcessName)
sub := newSubjectNoVerifyHandler ( cia . DstMethod , node )
2022-06-09 09:40:53 +00:00
// Create a new sub process that will do the actual file copying.
copyDstSubProc := newProcess ( ctx , proc . server , sub , processKindSubscriber , nil )
// Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler.
2022-06-14 08:17:09 +00:00
copyDstSubProc . procFunc = copyDstSubProcFunc ( copyDstSubProc , cia , message )
2022-06-09 09:40:53 +00:00
2022-06-11 04:30:58 +00:00
// assign a handler to the sub process
2022-06-14 05:05:38 +00:00
copyDstSubProc . handler = copyDstSubHandler ( cia )
2022-06-11 04:30:58 +00:00
2022-06-09 09:40:53 +00:00
// The process will be killed when the context expires.
go copyDstSubProc . spawnWorker ( )
2022-06-09 09:25:59 +00:00
2022-06-10 04:17:11 +00:00
fp := filepath . Join ( cia . DstDir , cia . DstFile )
replyData := fmt . Sprintf ( "info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n" , copyDstSubProc . processName , node , fp , subProcessName )
newReplyMessage ( proc , message , [ ] byte ( replyData ) )
2022-06-09 03:29:41 +00:00
} ( )
ackMsg := [ ] byte ( "confirmed from: " + node + ": " + fmt . Sprint ( message . ID ) )
return ackMsg , nil
}
2022-06-09 09:40:53 +00:00
2022-06-14 05:05:38 +00:00
func copySrcSubHandler ( cia copyInitialData ) func ( process , Message , string ) ( [ ] byte , error ) {
2022-06-11 04:30:58 +00:00
h := func ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-06-14 08:17:09 +00:00
2022-06-11 04:30:58 +00:00
// We should receive a ready message generated by the procFunc of Dst.
2022-06-14 05:05:38 +00:00
2022-06-11 04:30:58 +00:00
select {
case <- proc . ctx . Done ( ) :
log . Printf ( " * copySrcHandler ended: %v\n" , proc . processName )
2022-06-14 09:31:19 +00:00
case proc . procFuncCh <- message :
log . Printf ( " * copySrcHandler passing message over to procFunc: %v\n" , proc . processName )
2022-06-11 04:30:58 +00:00
}
return nil , nil
}
return h
}
2022-06-14 05:05:38 +00:00
func copyDstSubHandler ( cia copyInitialData ) func ( process , Message , string ) ( [ ] byte , error ) {
2022-06-11 04:30:58 +00:00
h := func ( proc process , message Message , node string ) ( [ ] byte , error ) {
2022-06-14 05:05:38 +00:00
2022-06-11 04:30:58 +00:00
select {
case <- proc . ctx . Done ( ) :
log . Printf ( " * copyDstHandler ended: %v\n" , proc . processName )
2022-06-14 12:32:35 +00:00
case proc . procFuncCh <- message :
log . Printf ( " * copySrcHandler passing message over to procFunc: %v\n" , proc . processName )
2022-06-11 04:30:58 +00:00
}
return nil , nil
}
return h
}
2022-06-14 05:05:38 +00:00
func copySrcSubProcFunc ( proc process , cia copyInitialData ) func ( context . Context , chan Message ) error {
2022-06-10 04:17:11 +00:00
pf := func ( ctx context . Context , procFuncCh chan Message ) error {
2022-06-14 12:32:35 +00:00
const readChuncSize = 2
// Initiate the file copier.
fh , err := os . Open ( cia . SrcFilePath )
if err != nil {
// errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
log . Fatalf ( "error: copySrcSubProcFunc: failed to open file: %v\n" , err )
return nil
}
defer fh . Close ( )
// Do action based on copyStatus received.
2022-06-14 09:31:19 +00:00
for {
2022-06-14 12:58:50 +00:00
fmt . Printf ( "\n * DEBUG: copySrcSubProcFunc: cia contains: %+v\n\n" , cia )
2022-06-14 09:31:19 +00:00
select {
case <- ctx . Done ( ) :
log . Printf ( " * copySrcProcFunc ENDED: %v\n" , proc . processName )
return nil
// Pick up the message recived by the copySrcSubHandler.
case message := <- procFuncCh :
var csa copySubData
err := cbor . Unmarshal ( message . Data , & csa )
if err != nil {
log . Fatalf ( "error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n" , err )
}
switch csa . CopyStatus {
case copyReady :
log . Printf ( " * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n" , csa . CopyStatus )
2022-06-14 12:32:35 +00:00
b := make ( [ ] byte , readChuncSize )
_ , err := fh . Read ( b )
if err != nil {
log . Fatalf ( "error: copySrcSubHandler: failed to read chuck from file: %v\n" , err )
}
// Create message and send data to dst
fmt . Printf ( "**** DATA READ: %v\n" , b )
2022-06-14 09:31:19 +00:00
default :
// TODO: Any error logic here ?
2022-06-14 12:32:35 +00:00
log . Fatalf ( "error: copySrcSubProcFunc: not valid copyStatus, exiting: %v\n" , csa . CopyStatus )
2022-06-14 09:31:19 +00:00
}
}
2022-06-10 04:17:11 +00:00
}
2022-06-14 09:31:19 +00:00
//return nil
2022-06-10 04:17:11 +00:00
}
return pf
}
2022-06-14 05:05:38 +00:00
type copyStatus int
const (
2022-06-14 08:17:09 +00:00
copyReady copyStatus = 1
2022-06-14 12:32:35 +00:00
copyData copyStatus = 2
2022-06-14 05:05:38 +00:00
)
2022-06-14 08:17:09 +00:00
// copySubData is the structure being used between the src and dst while copying data.
type copySubData struct {
CopyStatus copyStatus
CopyData [ ] byte
}
func copyDstSubProcFunc ( proc process , cia copyInitialData , message Message ) func ( context . Context , chan Message ) error {
2022-06-09 09:40:53 +00:00
pf := func ( ctx context . Context , procFuncCh chan Message ) error {
2022-06-14 08:17:09 +00:00
fmt . Printf ( "\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n" , cia )
csa := copySubData {
CopyStatus : copyReady ,
}
csaSerialized , err := cbor . Marshal ( csa )
if err != nil {
log . Fatalf ( "error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n" , err )
}
2022-06-09 09:40:53 +00:00
2022-06-14 05:05:38 +00:00
// We want to send a message back to src that we are ready to start.
2022-06-14 08:17:09 +00:00
fmt . Printf ( "\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n " , message . FromNode )
2022-06-14 05:05:38 +00:00
msg := Message {
ToNode : message . FromNode ,
2022-06-14 12:32:35 +00:00
FromNode : message . ToNode ,
2022-06-14 05:05:38 +00:00
Method : cia . SrcMethod ,
ReplyMethod : REQNone ,
2022-06-14 08:17:09 +00:00
Data : csaSerialized ,
2022-06-14 05:05:38 +00:00
}
2022-06-14 12:58:50 +00:00
fmt . Printf ( "\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n " , cia . SrcMethod )
2022-06-14 06:45:34 +00:00
sam , err := newSubjectAndMessage ( msg )
if err != nil {
log . Fatalf ( "copyDstProcSubFunc: newSubjectAndMessage failed: %v\n" , err )
2022-06-14 05:05:38 +00:00
}
proc . toRingbufferCh <- [ ] subjectAndMessage { sam }
2022-06-14 12:32:35 +00:00
for {
2022-06-14 12:58:50 +00:00
fmt . Printf ( "\n * DEBUG: copyDstSubProcFunc: cia contains: %+v\n\n" , cia )
2022-06-14 12:32:35 +00:00
select {
case <- ctx . Done ( ) :
log . Printf ( " * copyDstProcFunc ended: %v\n" , proc . processName )
return nil
case message := <- procFuncCh :
var csa copySubData
err := cbor . Unmarshal ( message . Data , & csa )
if err != nil {
log . Fatalf ( "error: copySrcSubHandler: cbor unmarshal of csa failed: %v\n" , err )
}
switch csa . CopyStatus {
case copyData :
// Write the data chunk to disk ?
fmt . Printf ( "\n * Received data: %s\n\n" , csa . CopyData )
}
}
2022-06-09 09:40:53 +00:00
}
}
return pf
}