mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
receiving copy data at dst
This commit is contained in:
parent
8d39c3c746
commit
df0c2ddc90
1 changed files with 43 additions and 3 deletions
|
@ -3,6 +3,7 @@ package steward
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -312,11 +313,17 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
|
||||||
|
|
||||||
switch csa.CopyStatus {
|
switch csa.CopyStatus {
|
||||||
case copyReady:
|
case copyReady:
|
||||||
|
// We set the default status to copyData. If we get an io.EOF we change it to copyDone later.
|
||||||
|
status := copyData
|
||||||
|
|
||||||
log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus)
|
log.Printf(" * RECEIVED in copySrcSubProcFunc * copyStatus=copyReady: %v\n\n", csa.CopyStatus)
|
||||||
b := make([]byte, readChuncSize)
|
b := make([]byte, readChuncSize)
|
||||||
_, err := fh.Read(b)
|
_, err := fh.Read(b)
|
||||||
if err != nil {
|
if err != nil && err != io.EOF {
|
||||||
log.Fatalf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err)
|
log.Printf("error: copySrcSubHandler: failed to read chuck from file: %v\n", err)
|
||||||
|
}
|
||||||
|
if err == io.EOF {
|
||||||
|
status = copyDone
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create message and send data to dst
|
// Create message and send data to dst
|
||||||
|
@ -325,7 +332,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData) func(context.Context,
|
||||||
// TESTING HERE:
|
// TESTING HERE:
|
||||||
{
|
{
|
||||||
csa := copySubData{
|
csa := copySubData{
|
||||||
CopyStatus: copyData,
|
CopyStatus: status,
|
||||||
CopyData: b,
|
CopyData: b,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,6 +379,7 @@ type copyStatus int
|
||||||
const (
|
const (
|
||||||
copyReady copyStatus = 1
|
copyReady copyStatus = 1
|
||||||
copyData copyStatus = 2
|
copyData copyStatus = 2
|
||||||
|
copyDone copyStatus = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
// copySubData is the structure being used between the src and dst while copying data.
|
// copySubData is the structure being used between the src and dst while copying data.
|
||||||
|
@ -429,6 +437,38 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message) func
|
||||||
case copyData:
|
case copyData:
|
||||||
// Write the data chunk to disk ?
|
// Write the data chunk to disk ?
|
||||||
fmt.Printf("\n * Received data: %s\n\n", csa.CopyData)
|
fmt.Printf("\n * Received data: %s\n\n", csa.CopyData)
|
||||||
|
|
||||||
|
// Send a ready message for the next chunk.
|
||||||
|
csa := copySubData{
|
||||||
|
CopyStatus: copyReady,
|
||||||
|
}
|
||||||
|
|
||||||
|
csaSerialized, err := cbor.Marshal(csa)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("error: copyDstSubProcFunc: cbor marshal of csa failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We want to send a message back to src that we are ready to start.
|
||||||
|
fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode)
|
||||||
|
msg := Message{
|
||||||
|
ToNode: cia.SrcNode,
|
||||||
|
FromNode: cia.DstNode,
|
||||||
|
Method: cia.SrcMethod,
|
||||||
|
ReplyMethod: REQNone,
|
||||||
|
Data: csaSerialized,
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
|
||||||
|
|
||||||
|
sam, err := newSubjectAndMessage(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("copyDstProcSubFunc: newSubjectAndMessage failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
|
case copyDone:
|
||||||
|
fmt.Printf("\n\n\n ************** DEBUG: Done \n\n\n")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue