mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
initial idea for large file copy
This commit is contained in:
parent
cb974a1a28
commit
f50e521743
4 changed files with 77 additions and 1 deletions
1
go.mod
1
go.mod
|
@ -26,6 +26,7 @@ require (
|
|||
github.com/go-playground/locales v0.14.0 // indirect
|
||||
github.com/go-playground/universal-translator v0.18.0 // indirect
|
||||
github.com/golang/protobuf v1.4.3 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/leodido/go-urn v1.2.1 // indirect
|
||||
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
|
||||
github.com/mattn/go-runewidth v0.0.13 // indirect
|
||||
|
|
2
go.sum
2
go.sum
|
@ -57,6 +57,8 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
|
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
|
||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
|
||||
|
|
|
@ -117,8 +117,10 @@ type Subject struct {
|
|||
}
|
||||
|
||||
// newSubject will return a new variable of the type subject, and insert
|
||||
// all the values given as arguments. It will also create the channel
|
||||
// all the values given as arguments. It will create the channel
|
||||
// to receive new messages on the specific subject.
|
||||
// The function will also verify that there is a methodHandler defined
|
||||
// for the Request type.
|
||||
func newSubject(method Method, node string) Subject {
|
||||
// Get the Event type for the Method.
|
||||
ma := method.GetMethodsAvailable()
|
||||
|
@ -136,6 +138,22 @@ func newSubject(method Method, node string) Subject {
|
|||
}
|
||||
}
|
||||
|
||||
// newSubjectNoVerifyHandler will return a new variable of the type subject, and insert
|
||||
// all the values given as arguments. It will create the channel
|
||||
// to receive new messages on the specific subject.
|
||||
// The function will not verify that there is a methodHandler defined
|
||||
// for the Request type.
|
||||
func newSubjectNoVerifyHandler(method Method, node string) Subject {
|
||||
// Get the Event type for the Method.
|
||||
|
||||
return Subject{
|
||||
ToNode: node,
|
||||
Event: EventACK,
|
||||
Method: method,
|
||||
messageCh: make(chan Message),
|
||||
}
|
||||
}
|
||||
|
||||
// subjectName is the complete representation of a subject
|
||||
type subjectName string
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/hpcloud/tail"
|
||||
)
|
||||
|
||||
|
@ -148,6 +149,60 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
|
|||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||
defer cancel()
|
||||
|
||||
{
|
||||
// Idea:
|
||||
//
|
||||
// Initialization, Source:
|
||||
// - Use the original REQCopyFileFrom method to handle the initial request from the user.
|
||||
// - Spawn a REQCopyFileFrom_uid subscriber to receive sync messages from destination.
|
||||
// - Send the uid, and full-file hash to the destination in a REQCopyFileTo message.
|
||||
//
|
||||
// Initialization, Destination:
|
||||
// - Spawn a REQCopyFileTo-uid from the uid we got from source.
|
||||
// --------------------------------------------------------------------------------------
|
||||
//
|
||||
// All below happens in the From-uid and To-uid methods until the copying is done.
|
||||
//
|
||||
// - dst->src, dst sends a REQCopyFileFrom-uid message with status "ready" file receiving to src.
|
||||
// - src receives the message and start reading the file:
|
||||
// - src, creates hash of the complete file.
|
||||
// - src, reads the file in chunks, and create a hash of each chunk.
|
||||
// - src->dst, send chunk when read.
|
||||
// - dst->src, wait for status "ready" indicating the chuck was transfered.
|
||||
// - Loop and read new chunc.
|
||||
// - src->dst, when last chunch is sent send status "last"
|
||||
// - src->dst, if failure send status "error", abort file copying and clean up on both src and dst.
|
||||
//
|
||||
// - dst, read and store each chunch to tmp folder and verify hash.
|
||||
// - dst->src, send status "ready" to src when chunch is stored.
|
||||
// - loop and check for status "last", if last:
|
||||
// - build original file from chuncs.
|
||||
// - verify hash when file is built.
|
||||
// - dst->src, send status "done".
|
||||
//
|
||||
// dataStructure{
|
||||
// Data []bytes
|
||||
// Status copyStatus
|
||||
// id int
|
||||
// }
|
||||
|
||||
// Create a new copy sync process to handle the actual file copying.
|
||||
// We use the context already created based on the time out specified
|
||||
// in the requestTimeout field of the message.
|
||||
|
||||
// Create a subject for one copy request
|
||||
uid := uuid.New()
|
||||
sm := fmt.Sprintf("copyFileFrom_%v", uid.String())
|
||||
m := Method(sm)
|
||||
|
||||
sub := newSubjectNoVerifyHandler(m, node)
|
||||
|
||||
//
|
||||
copySyncProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
|
||||
// The process will be killed when the context expires.
|
||||
go copySyncProc.spawnWorker()
|
||||
}
|
||||
|
||||
outCh := make(chan []byte)
|
||||
errCh := make(chan error)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue