1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

copy, src defines and sets the split chunk size

This commit is contained in:
postmannen 2022-06-15 09:48:32 +02:00
parent 036894a3fe
commit 69e0bd3696

View file

@ -13,6 +13,18 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
) )
type copyInitialData struct {
UUID string
SrcMethod Method
SrcNode Node
DstMethod Method
DstNode Node
SrcFilePath string
DstDir string
DstFile string
SplitChunkSize int
}
type methodREQCopySrc struct { type methodREQCopySrc struct {
event Event event Event
} }
@ -77,14 +89,29 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
go func() { go func() {
defer proc.processes.wg.Done() defer proc.processes.wg.Done()
// Set default split chunk size, will be replaced with value from
// methodArgs[3] if defined.
splitChunkSize := 2
// Verify and check the methodArgs
switch { switch {
case len(message.MethodArgs) < 3: case len(message.MethodArgs) < 3:
er := fmt.Errorf("error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") er := fmt.Errorf("error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
proc.errorKernel.errSend(proc, message, er) proc.errorKernel.errSend(proc, message, er)
return return
case len(message.MethodArgs) > 3:
// Check if split chunk size was set, if not set default.
var err error
splitChunkSize, err = strconv.Atoi(message.MethodArgs[3])
if err != nil {
er := fmt.Errorf("error: methodREQCopySrc: ch")
proc.errorKernel.errSend(proc, message, er)
}
} }
fmt.Printf("\n * DEBUG: IN THE BEGINNING: SPLITCHUNKSIZE: %v\n\n", splitChunkSize)
SrcFilePath := message.MethodArgs[0] SrcFilePath := message.MethodArgs[0]
DstNode := message.MethodArgs[1] DstNode := message.MethodArgs[1]
DstFilePath := message.MethodArgs[2] DstFilePath := message.MethodArgs[2]
@ -110,14 +137,15 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
dstM := Method(dstSubProcessName) dstM := Method(dstSubProcessName)
cia := copyInitialData{ cia := copyInitialData{
UUID: uid.String(), UUID: uid.String(),
SrcNode: proc.node, SrcNode: proc.node,
SrcMethod: m, SrcMethod: m,
DstNode: Node(DstNode), DstNode: Node(DstNode),
DstMethod: dstM, DstMethod: dstM,
SrcFilePath: SrcFilePath, SrcFilePath: SrcFilePath,
DstDir: dstDir, DstDir: dstDir,
DstFile: dstFile, DstFile: dstFile,
SplitChunkSize: splitChunkSize,
} }
sub := newSubjectNoVerifyHandler(m, node) sub := newSubjectNoVerifyHandler(m, node)
@ -173,17 +201,6 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
return ackMsg, nil return ackMsg, nil
} }
type copyInitialData struct {
UUID string
SrcMethod Method
SrcNode Node
DstMethod Method
DstNode Node
SrcFilePath string
DstDir string
DstFile string
}
// ---- // ----
type methodREQCopyDst struct { type methodREQCopyDst struct {
@ -299,7 +316,6 @@ type copySubData struct {
func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error { func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
const readChuncSize = 2
var chunkNumber = 0 var chunkNumber = 0
// Initiate the file copier. // Initiate the file copier.
@ -333,7 +349,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
status := copyData status := copyData
log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus) log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus)
b := make([]byte, readChuncSize) b := make([]byte, cia.SplitChunkSize)
_, err := fh.Read(b) _, err := fh.Read(b)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err) log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err)
@ -391,7 +407,6 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
} }
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error { func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error {
const readChuncSize = 2
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia) fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia)
@ -512,14 +527,15 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
if !info.IsDir() { if !info.IsDir() {
fmt.Println(path, info.Size()) fmt.Println(path, info.Size())
fmt.Printf(" * DEBUG: splitChunkSize: %v\n", cia.SplitChunkSize)
fh, err := os.Open(path) fh, err := os.Open(path)
if err != nil { if err != nil {
return err return err
} }
defer fh.Close() defer fh.Close()
b := make([]byte, readChuncSize) b := make([]byte, cia.SplitChunkSize)
_, err = fh.Read(b) _, err = fh.Read(b)
if err != nil { if err != nil {
return err return err