diff --git a/requests_copy.go b/requests_copy.go index e2c0ac3..1b68808 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -10,22 +10,24 @@ import ( "os" "path/filepath" "strconv" + "time" "github.com/fxamacker/cbor/v2" "github.com/google/uuid" ) type copyInitialData struct { - UUID string - SrcMethod Method - SrcNode Node - DstMethod Method - DstNode Node - SrcFilePath string - DstDir string - DstFile string - SplitChunkSize int - FileMode fs.FileMode + UUID string + SrcMethod Method + SrcNode Node + DstMethod Method + DstNode Node + SrcFilePath string + DstDir string + DstFile string + SplitChunkSize int + MaxTotalCopyTime int + FileMode fs.FileMode } type methodREQCopySrc struct { @@ -94,7 +96,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ // Set default split chunk size, will be replaced with value from // methodArgs[3] if defined. - splitChunkSize := 2 + splitChunkSize := 100000 + // Set default max total copy time, will be replaced with value from + // methodArgs[4] if defined. + maxTotalCopyTime := message.MethodTimeout // Verify and check the methodArgs switch { @@ -108,7 +113,16 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ var err error splitChunkSize, err = strconv.Atoi(message.MethodArgs[3]) if err != nil { - er := fmt.Errorf("error: methodREQCopySrc: ch") + er := fmt.Errorf("error: methodREQCopySrc: unble to convert splitChunkSize into int value: %v", err) + proc.errorKernel.errSend(proc, message, er) + } + + case len(message.MethodArgs) > 4: + // Check if split chunk size was set, if not set default. + var err error + maxTotalCopyTime, err = strconv.Atoi(message.MethodArgs[3]) + if err != nil { + er := fmt.Errorf("error: methodREQCopySrc: unble to convert maxTotalCopyTime into int value: %v", err) proc.errorKernel.errSend(proc, message, er) } } @@ -119,11 +133,13 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ DstNode := message.MethodArgs[1] DstFilePath := message.MethodArgs[2] - // Get a context with the timeout specified in message.MethodTimeout. - // Since the subProc spawned will outlive this method here we do not - // want to cancel this method. We care about the methodTimeout, but - // we ignore the CancelFunc. - ctx, cancel := getContextForMethodTimeout(proc.ctx, message) + // Create a child context to use with the procFunc with timeout set to the max allowed total copy time + // specified in the message. + var ctx context.Context + var cancel context.CancelFunc + func() { + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(maxTotalCopyTime)) + }() // Create a subject for one copy request uid := uuid.New() @@ -150,16 +166,17 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ fileMode := fileInfo.Mode() cia := copyInitialData{ - UUID: uid.String(), - SrcNode: proc.node, - SrcMethod: m, - DstNode: Node(DstNode), - DstMethod: dstM, - SrcFilePath: SrcFilePath, - DstDir: dstDir, - DstFile: dstFile, - SplitChunkSize: splitChunkSize, - FileMode: fileMode, + UUID: uid.String(), + SrcNode: proc.node, + SrcMethod: m, + DstNode: Node(DstNode), + DstMethod: dstM, + SrcFilePath: SrcFilePath, + DstDir: dstDir, + DstFile: dstFile, + SplitChunkSize: splitChunkSize, + MaxTotalCopyTime: maxTotalCopyTime, + FileMode: fileMode, } sub := newSubjectNoVerifyHandler(m, node) @@ -245,11 +262,13 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ return } - // Get a context with the timeout specified in message.MethodTimeout. - // Since the subProc spawned will outlive this method here we do not - // want to cancel this method. We care about the methodTimeout, but - // we ignore the CancelFunc. - ctx, cancel := getContextForMethodTimeout(proc.ctx, message) + // Create a child context to use with the procFunc with timeout set to the max allowed total copy time + // specified in the message. + var ctx context.Context + var cancel context.CancelFunc + func() { + ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(cia.MaxTotalCopyTime)) + }() // Create a subject for one copy request sub := newSubjectNoVerifyHandler(cia.DstMethod, node)