mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
added seprata timwe for MaxAllowedCopy time which can be specified in the original message
This commit is contained in:
parent
1bdee3872b
commit
f3f5e64c45
1 changed files with 51 additions and 32 deletions
|
@ -10,22 +10,24 @@ import (
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/fxamacker/cbor/v2"
|
"github.com/fxamacker/cbor/v2"
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
type copyInitialData struct {
|
type copyInitialData struct {
|
||||||
UUID string
|
UUID string
|
||||||
SrcMethod Method
|
SrcMethod Method
|
||||||
SrcNode Node
|
SrcNode Node
|
||||||
DstMethod Method
|
DstMethod Method
|
||||||
DstNode Node
|
DstNode Node
|
||||||
SrcFilePath string
|
SrcFilePath string
|
||||||
DstDir string
|
DstDir string
|
||||||
DstFile string
|
DstFile string
|
||||||
SplitChunkSize int
|
SplitChunkSize int
|
||||||
FileMode fs.FileMode
|
MaxTotalCopyTime int
|
||||||
|
FileMode fs.FileMode
|
||||||
}
|
}
|
||||||
|
|
||||||
type methodREQCopySrc struct {
|
type methodREQCopySrc struct {
|
||||||
|
@ -94,7 +96,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
||||||
|
|
||||||
// Set default split chunk size, will be replaced with value from
|
// Set default split chunk size, will be replaced with value from
|
||||||
// methodArgs[3] if defined.
|
// methodArgs[3] if defined.
|
||||||
splitChunkSize := 2
|
splitChunkSize := 100000
|
||||||
|
// Set default max total copy time, will be replaced with value from
|
||||||
|
// methodArgs[4] if defined.
|
||||||
|
maxTotalCopyTime := message.MethodTimeout
|
||||||
|
|
||||||
// Verify and check the methodArgs
|
// Verify and check the methodArgs
|
||||||
switch {
|
switch {
|
||||||
|
@ -108,7 +113,16 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
||||||
var err error
|
var err error
|
||||||
splitChunkSize, err = strconv.Atoi(message.MethodArgs[3])
|
splitChunkSize, err = strconv.Atoi(message.MethodArgs[3])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: methodREQCopySrc: ch")
|
er := fmt.Errorf("error: methodREQCopySrc: unble to convert splitChunkSize into int value: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
case len(message.MethodArgs) > 4:
|
||||||
|
// Check if split chunk size was set, if not set default.
|
||||||
|
var err error
|
||||||
|
maxTotalCopyTime, err = strconv.Atoi(message.MethodArgs[3])
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: methodREQCopySrc: unble to convert maxTotalCopyTime into int value: %v", err)
|
||||||
proc.errorKernel.errSend(proc, message, er)
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -119,11 +133,13 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
||||||
DstNode := message.MethodArgs[1]
|
DstNode := message.MethodArgs[1]
|
||||||
DstFilePath := message.MethodArgs[2]
|
DstFilePath := message.MethodArgs[2]
|
||||||
|
|
||||||
// Get a context with the timeout specified in message.MethodTimeout.
|
// Create a child context to use with the procFunc with timeout set to the max allowed total copy time
|
||||||
// Since the subProc spawned will outlive this method here we do not
|
// specified in the message.
|
||||||
// want to cancel this method. We care about the methodTimeout, but
|
var ctx context.Context
|
||||||
// we ignore the CancelFunc.
|
var cancel context.CancelFunc
|
||||||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
func() {
|
||||||
|
ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(maxTotalCopyTime))
|
||||||
|
}()
|
||||||
|
|
||||||
// Create a subject for one copy request
|
// Create a subject for one copy request
|
||||||
uid := uuid.New()
|
uid := uuid.New()
|
||||||
|
@ -150,16 +166,17 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
|
||||||
fileMode := fileInfo.Mode()
|
fileMode := fileInfo.Mode()
|
||||||
|
|
||||||
cia := copyInitialData{
|
cia := copyInitialData{
|
||||||
UUID: uid.String(),
|
UUID: uid.String(),
|
||||||
SrcNode: proc.node,
|
SrcNode: proc.node,
|
||||||
SrcMethod: m,
|
SrcMethod: m,
|
||||||
DstNode: Node(DstNode),
|
DstNode: Node(DstNode),
|
||||||
DstMethod: dstM,
|
DstMethod: dstM,
|
||||||
SrcFilePath: SrcFilePath,
|
SrcFilePath: SrcFilePath,
|
||||||
DstDir: dstDir,
|
DstDir: dstDir,
|
||||||
DstFile: dstFile,
|
DstFile: dstFile,
|
||||||
SplitChunkSize: splitChunkSize,
|
SplitChunkSize: splitChunkSize,
|
||||||
FileMode: fileMode,
|
MaxTotalCopyTime: maxTotalCopyTime,
|
||||||
|
FileMode: fileMode,
|
||||||
}
|
}
|
||||||
|
|
||||||
sub := newSubjectNoVerifyHandler(m, node)
|
sub := newSubjectNoVerifyHandler(m, node)
|
||||||
|
@ -245,11 +262,13 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a context with the timeout specified in message.MethodTimeout.
|
// Create a child context to use with the procFunc with timeout set to the max allowed total copy time
|
||||||
// Since the subProc spawned will outlive this method here we do not
|
// specified in the message.
|
||||||
// want to cancel this method. We care about the methodTimeout, but
|
var ctx context.Context
|
||||||
// we ignore the CancelFunc.
|
var cancel context.CancelFunc
|
||||||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
func() {
|
||||||
|
ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(cia.MaxTotalCopyTime))
|
||||||
|
}()
|
||||||
|
|
||||||
// Create a subject for one copy request
|
// Create a subject for one copy request
|
||||||
sub := newSubjectNoVerifyHandler(cia.DstMethod, node)
|
sub := newSubjectNoVerifyHandler(cia.DstMethod, node)
|
||||||
|
|
Loading…
Add table
Reference in a new issue