1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

writing tmp and final destinaton files

This commit is contained in:
postmannen 2022-06-15 07:51:58 +02:00
parent 23c22210a5
commit 036894a3fe

View file

@ -7,6 +7,7 @@ import (
"log"
"os"
"path/filepath"
"strconv"
"github.com/fxamacker/cbor/v2"
"github.com/google/uuid"
@ -280,10 +281,26 @@ func copyDstSubHandler(cia copyInitialData) func(process, Message, string) ([]by
return h
}
type copyStatus int
const (
copyReady copyStatus = 1
copyData copyStatus = 2
copyDone copyStatus = 3
)
// copySubData is the structure being used between the src and dst while copying data.
type copySubData struct {
CopyStatus copyStatus
CopyData []byte
ChunkNumber int
}
func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error {
const readChuncSize = 2
var chunkNumber = 0
// Initiate the file copier.
fh, err := os.Open(cia.SrcFilePath)
@ -325,12 +342,15 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
status = copyDone
}
chunkNumber++
// Create message and send data to dst
fmt.Printf("**** DATA READ: %v\n", b)
csa := copySubData{
CopyStatus: status,
CopyData: b,
CopyStatus: status,
CopyData: b,
ChunkNumber: chunkNumber,
}
csaSerialized, err := cbor.Marshal(csa)
@ -370,21 +390,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
return pf
}
type copyStatus int
const (
copyReady copyStatus = 1
copyData copyStatus = 2
copyDone copyStatus = 3
)
// copySubData is the structure being used between the src and dst while copying data.
type copySubData struct {
CopyStatus copyStatus
CopyData []byte
}
func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func(context.Context, chan Message) error {
const readChuncSize = 2
pf := func(ctx context.Context, procFuncCh chan Message) error {
fmt.Printf("\n ******* WORKING IN copyDstSubProcFunc: %+v\n\n", cia)
@ -398,23 +406,29 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
}
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
Method: cia.SrcMethod,
ReplyMethod: REQNone,
Data: csaSerialized,
{
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
FromNode: cia.DstNode,
Method: cia.SrcMethod,
ReplyMethod: REQNone,
Data: csaSerialized,
}
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
}
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
sam, err := newSubjectAndMessage(msg)
if err != nil {
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
}
proc.toRingbufferCh <- []subjectAndMessage{sam}
// Open a tmp folder for where to write the received chunks
tmpFolder := filepath.Join(proc.configuration.SocketFolder, cia.DstFile+"-"+cia.UUID)
os.Mkdir(tmpFolder, 0700)
for {
fmt.Printf("\n * DEBUG: copyDstSubProcFunc: cia contains: %+v\n\n", cia)
@ -434,7 +448,21 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
// Write the data chunk to disk ?
fmt.Printf("\n * Received data: %s\n\n", csa.CopyData)
// Send a ready message for the next chunk.
func() {
filePath := filepath.Join(tmpFolder, strconv.Itoa(csa.ChunkNumber)+"."+cia.UUID)
fh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err)
}
defer fh.Close()
_, err = fh.Write(csa.CopyData)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err)
}
}()
// Prepare and send a ready message to src for the next chunk.
csa := copySubData{
CopyStatus: copyReady,
}
@ -444,7 +472,6 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
}
// We want to send a message back to src that we are ready to start.
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
msg := Message{
ToNode: cia.SrcNode,
@ -464,7 +491,58 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
proc.toRingbufferCh <- []subjectAndMessage{sam}
case copyDone:
fmt.Printf("\n\n\n ************** DEBUG: Done \n\n\n")
fmt.Printf("\n\n\n ************** DEBUG: copyDone \n\n\n")
// var mainFileData []byte
func() {
// Open the main file that chunks files will be written into.
filePath := filepath.Join(cia.DstDir, cia.DstFile)
mainfh, err := os.OpenFile(filePath, os.O_TRUNC|os.O_RDWR|os.O_CREATE|os.O_SYNC, 0600)
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: open file failed: %v\n", err)
}
defer mainfh.Close()
// Walk the tmp transfer directory and combine all the chunks into one file.
err = filepath.Walk(tmpFolder, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.IsDir() {
fmt.Println(path, info.Size())
fh, err := os.Open(path)
if err != nil {
return err
}
defer fh.Close()
b := make([]byte, readChuncSize)
_, err = fh.Read(b)
if err != nil {
return err
}
fmt.Printf(" * DEBUG: read: %v\n", b)
_, err = mainfh.Write(b)
if err != nil {
return err
}
// mainFileData = append(mainFileData, b...)
}
return nil
})
if err != nil {
log.Fatalf("error: copyDstSubProcFunc: filewalk failed: %v\n", err)
}
// fmt.Printf("main file contains: %v\n", mainFileData)
}()
}
}
}