From f50e52174362e37ed65f00f8b37b9b50f3a4b987 Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 7 Jun 2022 11:52:30 +0200 Subject: [PATCH] initial idea for large file copy --- go.mod | 1 + go.sum | 2 ++ message_and_subject.go | 20 +++++++++++++- requests_file_handling.go | 55 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 77 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index df40169..46da030 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( github.com/go-playground/locales v0.14.0 // indirect github.com/go-playground/universal-translator v0.18.0 // indirect github.com/golang/protobuf v1.4.3 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect diff --git a/go.sum b/go.sum index 93f41b3..faff886 100644 --- a/go.sum +++ b/go.sum @@ -57,6 +57,8 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= diff --git a/message_and_subject.go b/message_and_subject.go index 9290cd1..ed27026 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -117,8 +117,10 @@ type Subject struct { } // newSubject will return a new variable of the type subject, and insert -// all the values given as arguments. It will also create the channel +// all the values given as arguments. It will create the channel // to receive new messages on the specific subject. +// The function will also verify that there is a methodHandler defined +// for the Request type. func newSubject(method Method, node string) Subject { // Get the Event type for the Method. ma := method.GetMethodsAvailable() @@ -136,6 +138,22 @@ func newSubject(method Method, node string) Subject { } } +// newSubjectNoVerifyHandler will return a new variable of the type subject, and insert +// all the values given as arguments. It will create the channel +// to receive new messages on the specific subject. +// The function will not verify that there is a methodHandler defined +// for the Request type. +func newSubjectNoVerifyHandler(method Method, node string) Subject { + // Get the Event type for the Method. + + return Subject{ + ToNode: node, + Event: EventACK, + Method: method, + messageCh: make(chan Message), + } +} + // subjectName is the complete representation of a subject type subjectName string diff --git a/requests_file_handling.go b/requests_file_handling.go index 699229e..066b29a 100644 --- a/requests_file_handling.go +++ b/requests_file_handling.go @@ -9,6 +9,7 @@ import ( "path/filepath" "sync" + "github.com/google/uuid" "github.com/hpcloud/tail" ) @@ -148,6 +149,60 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin ctx, cancel := getContextForMethodTimeout(proc.ctx, message) defer cancel() + { + // Idea: + // + // Initialization, Source: + // - Use the original REQCopyFileFrom method to handle the initial request from the user. + // - Spawn a REQCopyFileFrom_uid subscriber to receive sync messages from destination. + // - Send the uid, and full-file hash to the destination in a REQCopyFileTo message. + // + // Initialization, Destination: + // - Spawn a REQCopyFileTo-uid from the uid we got from source. + // -------------------------------------------------------------------------------------- + // + // All below happens in the From-uid and To-uid methods until the copying is done. + // + // - dst->src, dst sends a REQCopyFileFrom-uid message with status "ready" file receiving to src. + // - src receives the message and start reading the file: + // - src, creates hash of the complete file. + // - src, reads the file in chunks, and create a hash of each chunk. + // - src->dst, send chunk when read. + // - dst->src, wait for status "ready" indicating the chuck was transfered. + // - Loop and read new chunc. + // - src->dst, when last chunch is sent send status "last" + // - src->dst, if failure send status "error", abort file copying and clean up on both src and dst. + // + // - dst, read and store each chunch to tmp folder and verify hash. + // - dst->src, send status "ready" to src when chunch is stored. + // - loop and check for status "last", if last: + // - build original file from chuncs. + // - verify hash when file is built. + // - dst->src, send status "done". + // + // dataStructure{ + // Data []bytes + // Status copyStatus + // id int + // } + + // Create a new copy sync process to handle the actual file copying. + // We use the context already created based on the time out specified + // in the requestTimeout field of the message. + + // Create a subject for one copy request + uid := uuid.New() + sm := fmt.Sprintf("copyFileFrom_%v", uid.String()) + m := Method(sm) + + sub := newSubjectNoVerifyHandler(m, node) + + // + copySyncProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) + // The process will be killed when the context expires. + go copySyncProc.spawnWorker() + } + outCh := make(chan []byte) errCh := make(chan error)