1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

add copyfilefrom func

This commit is contained in:
postmannen 2021-11-22 09:26:56 +01:00
parent 9b07eaa44b
commit 2b3fb56dc4

View file

@ -45,6 +45,7 @@ import (
"path"
"path/filepath"
"strings"
"sync"
"time"
"github.com/hpcloud/tail"
@ -711,43 +712,7 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
// Read the file, and put the result on the out channel to be sent when done reading.
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
const natsMaxMsgSize = 1000000
fi, err := os.Stat(SrcFilePath)
// Check if the src file exists, and that it is not bigger than
// the default limit used by nats which is 1MB.
switch {
case os.IsNotExist(err):
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
return
case fi.Size() > natsMaxMsgSize:
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
return
}
fh, err := os.Open(SrcFilePath)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
return
}
b, err := io.ReadAll(fh)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
return
}
select {
case outCh <- b:
fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
case <-ctx.Done():
return
}
}()
go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh)
// Wait here until we got the data to send, then create a new message
// and send it.
@ -805,6 +770,44 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
return ackMsg, nil
}
func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, errCh chan error, outCh chan []byte) {
defer wg.Done()
const natsMaxMsgSize = 1000000
fi, err := os.Stat(SrcFilePath)
// Check if the src file exists, and that it is not bigger than
// the default limit used by nats which is 1MB.
switch {
case os.IsNotExist(err):
errCh <- fmt.Errorf("error: methodREQCopyFile: src file not found: %v", SrcFilePath)
return
case fi.Size() > natsMaxMsgSize:
errCh <- fmt.Errorf("error: methodREQCopyFile: src file to big. max size: %v", natsMaxMsgSize)
return
}
fh, err := os.Open(SrcFilePath)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to open file: %v, %v", SrcFilePath, err)
return
}
b, err := io.ReadAll(fh)
if err != nil {
errCh <- fmt.Errorf("error: methodREQCopyFile: failed to read file: %v, %v", SrcFilePath, err)
return
}
select {
case outCh <- b:
fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
case <-ctx.Done():
return
}
}
// ----
type methodREQCopyFileTo struct {