From 6f10ed9ecd15517a1069fbfebeaa914cf072c44c Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 9 Jun 2022 05:59:37 +0200 Subject: [PATCH] Added startup flags for new copy src and dst methods --- configuration_flags.go | 20 ++++++++++++++++++++ processes.go | 24 ++++++++++++++++++++++++ requests_copy.go | 6 ++++-- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index 0556c56..2317f48 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -113,6 +113,10 @@ type Configuration struct { StartSubREQCopyFileFrom bool // Subscriber for writing copied files to disk StartSubREQCopyFileTo bool + // Subscriber for reading files to copy + StartSubREQCopySrc bool + // Subscriber for writing copied files to disk + StartSubREQCopyDst bool // Subscriber for Echo Request StartSubREQPing bool // Subscriber for Echo Reply @@ -184,6 +188,8 @@ type ConfigurationFromFile struct { StartSubREQToFileNACK *bool StartSubREQCopyFileFrom *bool StartSubREQCopyFileTo *bool + StartSubREQCopySrc *bool + StartSubREQCopyDst *bool StartSubREQPing *bool StartSubREQPong *bool StartSubREQCliCommand *bool @@ -250,6 +256,8 @@ func newConfigurationDefaults() Configuration { StartSubREQToFileNACK: true, StartSubREQCopyFileFrom: true, StartSubREQCopyFileTo: true, + StartSubREQCopySrc: true, + StartSubREQCopyDst: true, StartSubREQPing: true, StartSubREQPong: true, StartSubREQCliCommand: true, @@ -499,6 +507,16 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQCopyFileTo = *cf.StartSubREQCopyFileTo } + if cf.StartSubREQCopySrc == nil { + conf.StartSubREQCopySrc = cd.StartSubREQCopySrc + } else { + conf.StartSubREQCopySrc = *cf.StartSubREQCopySrc + } + if cf.StartSubREQCopyDst == nil { + conf.StartSubREQCopyDst = cd.StartSubREQCopyDst + } else { + conf.StartSubREQCopyDst = *cf.StartSubREQCopyDst + } if cf.StartSubREQPing == nil { conf.StartSubREQPing = cd.StartSubREQPing } else { @@ -627,6 +645,8 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQToFileNACK, "startSubREQToFileNACK", fc.StartSubREQToFileNACK, "true/false") flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false") flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false") + flag.BoolVar(&c.StartSubREQCopySrc, "startSubREQCopySrc", fc.StartSubREQCopySrc, "true/false") + flag.BoolVar(&c.StartSubREQCopyDst, "startSubREQCopyDst", fc.StartSubREQCopyDst, "true/false") flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false") flag.BoolVar(&c.StartSubREQPong, "startSubREQPong", fc.StartSubREQPong, "true/false") flag.BoolVar(&c.StartSubREQCliCommand, "startSubREQCliCommand", fc.StartSubREQCliCommand, "true/false") diff --git a/processes.go b/processes.go index f54bf59..8df5581 100644 --- a/processes.go +++ b/processes.go @@ -141,6 +141,14 @@ func (p *processes) Start(proc process) { proc.startup.subREQCopyFileTo(proc) } + if proc.configuration.StartSubREQCopySrc { + proc.startup.subREQCopySrc(proc) + } + + if proc.configuration.StartSubREQCopyDst { + proc.startup.subREQCopyDst(proc) + } + if proc.configuration.StartSubREQHello { proc.startup.subREQHello(proc) } @@ -680,6 +688,22 @@ func (s startup) subREQCopyFileTo(p process) { go proc.spawnWorker() } +func (s startup) subREQCopySrc(p process) { + log.Printf("Starting copy src subscriber: %#v\n", p.node) + sub := newSubject(REQCopySrc, string(p.node)) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + + go proc.spawnWorker() +} + +func (s startup) subREQCopyDst(p process) { + log.Printf("Starting copy dst subscriber: %#v\n", p.node) + sub := newSubject(REQCopyDst, string(p.node)) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + + go proc.spawnWorker() +} + func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) diff --git a/requests_copy.go b/requests_copy.go index 0f30c22..be4ee57 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -81,8 +81,10 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ DstFilePath := message.MethodArgs[2] // Get a context with the timeout specified in message.MethodTimeout. - ctx, cancel := getContextForMethodTimeout(proc.ctx, message) - defer cancel() + // Since the subProc spawned will outlive this method here we do not + // want to cancel this method. We care about the methodTimeout, but + // we ignore the CancelFunc. + ctx, _ := getContextForMethodTimeout(proc.ctx, message) // Create a subject for one copy request uid := uuid.New()