mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
initial new copy methods
This commit is contained in:
parent
f50e521743
commit
0b29cbc6a0
5 changed files with 159 additions and 57 deletions
2
go.mod
2
go.mod
|
@ -7,6 +7,7 @@ require (
|
|||
github.com/fxamacker/cbor/v2 v2.3.1
|
||||
github.com/gdamore/tcell/v2 v2.4.1-0.20210905002822-f057f0a857a1
|
||||
github.com/go-playground/validator/v10 v10.10.1
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/hpcloud/tail v1.0.0
|
||||
github.com/jinzhu/copier v0.3.5
|
||||
github.com/klauspost/compress v1.14.2
|
||||
|
@ -26,7 +27,6 @@ require (
|
|||
github.com/go-playground/locales v0.14.0 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.0 // indirect
|
||||
github.com/golang/protobuf v1.4.3 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.2.1 // indirect
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.13 // indirect
|
||||
|
|
1
go.sum
1
go.sum
|
@ -245,7 +245,6 @@ gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|||
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
|
|
11
requests.go
11
requests.go
|
@ -96,6 +96,11 @@ const (
|
|||
// Write the destination copied to some node.
|
||||
REQCopyFileTo Method = "REQCopyFileTo"
|
||||
// Send Hello I'm here message.
|
||||
// Read the source file to be copied to some node.
|
||||
REQCopySrc Method = "REQCopySrc"
|
||||
// Write the destination copied to some node.
|
||||
REQCopyDst Method = "REQCopyDst"
|
||||
// Send Hello I'm here message.
|
||||
REQHello Method = "REQHello"
|
||||
// Error log methods to centralError node.
|
||||
REQErrorLog Method = "REQErrorLog"
|
||||
|
@ -216,6 +221,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
REQCopyFileTo: methodREQCopyFileTo{
|
||||
event: EventACK,
|
||||
},
|
||||
REQCopySrc: methodREQCopySrc{
|
||||
event: EventACK,
|
||||
},
|
||||
REQCopyDst: methodREQCopyDst{
|
||||
event: EventACK,
|
||||
},
|
||||
REQHello: methodREQHello{
|
||||
event: EventNACK,
|
||||
},
|
||||
|
|
147
requests_copy.go
Normal file
147
requests_copy.go
Normal file
|
@ -0,0 +1,147 @@
|
|||
package steward
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type methodREQCopySrc struct {
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQCopySrc) getKind() Event {
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Idea:
|
||||
//
|
||||
// Initialization, Source:
|
||||
// - Use the REQCopySrc method to handle the initial request from the user.
|
||||
// - Spawn a REQCopySrc_uid subscriber to receive sync messages from destination.
|
||||
// - Send the uid, and full-file hash to the destination in a REQCopyDst message.
|
||||
//
|
||||
// Initialization, Destination:
|
||||
// - Spawn a REQCopyDst-uid from the uid we got from source.
|
||||
// --------------------------------------------------------------------------------------
|
||||
//
|
||||
// All below happens in the From-uid and To-uid methods until the copying is done.
|
||||
//
|
||||
// - dst->src, dst sends a REQCopySrc-uid message with status "ready" file receiving to src.
|
||||
// - src receives the message and start reading the file:
|
||||
// - src, creates hash of the complete file.
|
||||
// - src, reads the file in chunks, and create a hash of each chunk.
|
||||
// - src->dst, send chunk when read.
|
||||
// - dst->src, wait for status "ready" indicating the chuck was transfered.
|
||||
// - Loop and read new chunc.
|
||||
// - src->dst, when last chunch is sent send status "last"
|
||||
// - src->dst, if failure send status "error", abort file copying and clean up on both src and dst.
|
||||
//
|
||||
// - dst, read and store each chunch to tmp folder and verify hash.
|
||||
// - dst->src, send status "ready" to src when chunch is stored.
|
||||
// - loop and check for status "last", if last:
|
||||
// - build original file from chuncs.
|
||||
// - verify hash when file is built.
|
||||
// - dst->src, send status "done".
|
||||
//
|
||||
// - We should also be be able to resend a chunk, or restart the copying from where we left of if it seems to hang.
|
||||
//
|
||||
// dataStructure{
|
||||
// Data []bytes
|
||||
// Status copyStatus
|
||||
// id int
|
||||
// }
|
||||
//
|
||||
// Create a new copy sync process to handle the actual file copying.
|
||||
// We use the context already created based on the time out specified
|
||||
// in the requestTimeout field of the message.
|
||||
//
|
||||
// -----------------------------------------------------
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
// exist.
|
||||
func (m methodREQCopySrc) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
var subProcessName string
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
switch {
|
||||
case len(message.MethodArgs) < 3:
|
||||
er := fmt.Errorf("error: methodREQCopySrc: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath")
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
SrcFilePath := message.MethodArgs[0]
|
||||
DstNode := message.MethodArgs[1]
|
||||
DstFilePath := message.MethodArgs[2]
|
||||
|
||||
// Get a context with the timeout specified in message.MethodTimeout.
|
||||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||
defer cancel()
|
||||
|
||||
// Create a subject for one copy request
|
||||
uid := uuid.New()
|
||||
subProcessName = fmt.Sprintf("copySrc_%v", uid.String())
|
||||
|
||||
m := Method(subProcessName)
|
||||
sub := newSubjectNoVerifyHandler(m, node)
|
||||
|
||||
// Create a new sub process that will do the actual file copying.
|
||||
copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
// Give the sub process a procFunc so we do the actual copying within a procFunc,
|
||||
// and not directly within the handler.
|
||||
copySrcSubProc.procFunc = copySrcProcFunc()
|
||||
// The process will be killed when the context expires.
|
||||
go copySrcSubProc.spawnWorker()
|
||||
|
||||
replyData := fmt.Sprintf("info: succesfully initiated copy: srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", node, SrcFilePath, DstNode, DstFilePath, subProcessName)
|
||||
newReplyMessage(proc, message, []byte(replyData))
|
||||
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
func copySrcProcFunc() func(context.Context, chan Message) error {
|
||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return pf
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
type methodREQCopyDst struct {
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQCopyDst) getKind() Event {
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handle writing to a file. Will truncate any existing data if the file did already
|
||||
// exist.
|
||||
// Same as the REQToFile, but this requst type don't use the default data folder path
|
||||
// for where to store files or add information about node names.
|
||||
// This method also sends a msgReply back to the publisher if the method was done
|
||||
// successfully, where REQToFile do not.
|
||||
// This method will truncate and overwrite any existing files.
|
||||
func (m methodREQCopyDst) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
|
@ -9,7 +9,6 @@ import (
|
|||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hpcloud/tail"
|
||||
)
|
||||
|
||||
|
@ -149,60 +148,6 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
|
|||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||
defer cancel()
|
||||
|
||||
{
|
||||
// Idea:
|
||||
//
|
||||
// Initialization, Source:
|
||||
// - Use the original REQCopyFileFrom method to handle the initial request from the user.
|
||||
// - Spawn a REQCopyFileFrom_uid subscriber to receive sync messages from destination.
|
||||
// - Send the uid, and full-file hash to the destination in a REQCopyFileTo message.
|
||||
//
|
||||
// Initialization, Destination:
|
||||
// - Spawn a REQCopyFileTo-uid from the uid we got from source.
|
||||
// --------------------------------------------------------------------------------------
|
||||
//
|
||||
// All below happens in the From-uid and To-uid methods until the copying is done.
|
||||
//
|
||||
// - dst->src, dst sends a REQCopyFileFrom-uid message with status "ready" file receiving to src.
|
||||
// - src receives the message and start reading the file:
|
||||
// - src, creates hash of the complete file.
|
||||
// - src, reads the file in chunks, and create a hash of each chunk.
|
||||
// - src->dst, send chunk when read.
|
||||
// - dst->src, wait for status "ready" indicating the chuck was transfered.
|
||||
// - Loop and read new chunc.
|
||||
// - src->dst, when last chunch is sent send status "last"
|
||||
// - src->dst, if failure send status "error", abort file copying and clean up on both src and dst.
|
||||
//
|
||||
// - dst, read and store each chunch to tmp folder and verify hash.
|
||||
// - dst->src, send status "ready" to src when chunch is stored.
|
||||
// - loop and check for status "last", if last:
|
||||
// - build original file from chuncs.
|
||||
// - verify hash when file is built.
|
||||
// - dst->src, send status "done".
|
||||
//
|
||||
// dataStructure{
|
||||
// Data []bytes
|
||||
// Status copyStatus
|
||||
// id int
|
||||
// }
|
||||
|
||||
// Create a new copy sync process to handle the actual file copying.
|
||||
// We use the context already created based on the time out specified
|
||||
// in the requestTimeout field of the message.
|
||||
|
||||
// Create a subject for one copy request
|
||||
uid := uuid.New()
|
||||
sm := fmt.Sprintf("copyFileFrom_%v", uid.String())
|
||||
m := Method(sm)
|
||||
|
||||
sub := newSubjectNoVerifyHandler(m, node)
|
||||
|
||||
//
|
||||
copySyncProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
// The process will be killed when the context expires.
|
||||
go copySyncProc.spawnWorker()
|
||||
}
|
||||
|
||||
outCh := make(chan []byte)
|
||||
errCh := make(chan error)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue