diff --git a/configuration_flags.go b/configuration_flags.go index 2195eef..fe23b42 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -94,6 +94,8 @@ type Configuration struct { StartSubREQToFileAppend bool // Subscriber for writing to file StartSubREQToFile bool + // Subscriber for writing to file without ACK + StartSubREQToFileNACK bool // Subscriber for reading files to copy StartSubREQCopyFileFrom bool // Subscriber for writing copied files to disk @@ -161,6 +163,7 @@ type ConfigurationFromFile struct { StartSubREQHello *bool StartSubREQToFileAppend *bool StartSubREQToFile *bool + StartSubREQToFileNACK *bool StartSubREQCopyFileFrom *bool StartSubREQCopyFileTo *bool StartSubREQPing *bool @@ -221,6 +224,7 @@ func newConfigurationDefaults() Configuration { StartSubREQHello: true, StartSubREQToFileAppend: true, StartSubREQToFile: true, + StartSubREQToFileNACK: true, StartSubREQCopyFileFrom: true, StartSubREQCopyFileTo: true, StartSubREQPing: true, @@ -430,6 +434,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.StartSubREQToFile = *cf.StartSubREQToFile } + if cf.StartSubREQToFileNACK == nil { + conf.StartSubREQToFileNACK = cd.StartSubREQToFileNACK + } else { + conf.StartSubREQToFileNACK = *cf.StartSubREQToFileNACK + } if cf.StartSubREQCopyFileFrom == nil { conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom } else { @@ -556,6 +565,7 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false") flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false") flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false") + flag.BoolVar(&c.StartSubREQToFileNACK, "startSubREQToFileNACK", fc.StartSubREQToFileNACK, "true/false") flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false") flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false") flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false") diff --git a/processes.go b/processes.go index a6b26c4..caf48c8 100644 --- a/processes.go +++ b/processes.go @@ -122,6 +122,11 @@ func (p *processes) Start(proc process) { proc.startup.subREQToFile(proc) } + // Start a subscriber for text to file messages + if proc.configuration.StartSubREQToFileNACK { + proc.startup.subREQToFileNACK(proc) + } + // Start a subscriber for reading file to copy if proc.configuration.StartSubREQCopyFileFrom { proc.startup.subREQCopyFileFrom(proc) @@ -382,6 +387,14 @@ func (s startup) subREQToFile(p process) { go proc.spawnWorker(p.processes, p.natsConn) } +func (s startup) subREQToFileNACK(p process) { + log.Printf("Starting text to file subscriber: %#v\n", p.node) + sub := newSubject(REQToFileNACK, string(p.node)) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + + go proc.spawnWorker(p.processes, p.natsConn) +} + func (s startup) subREQCopyFileFrom(p process) { log.Printf("Starting copy file from subscriber: %#v\n", p.node) sub := newSubject(REQCopyFileFrom, string(p.node)) diff --git a/requests.go b/requests.go index 147a5cd..fdbce99 100644 --- a/requests.go +++ b/requests.go @@ -102,6 +102,8 @@ const ( // The data field is a slice of strings where the values of the // slice will be written to the file. REQToFile Method = "REQToFile" + // REQToFileNACK same as REQToFile but NACK. + REQToFileNACK Method = "REQToFileNACK" // Read the source file to be copied to some node. REQCopyFileFrom Method = "REQCopyFileFrom" // Write the destination copied to some node. @@ -175,6 +177,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQToFile: methodREQToFile{ event: EventACK, }, + REQToFileNACK: methodREQToFile{ + event: EventNACK, + }, REQCopyFileFrom: methodREQCopyFileFrom{ event: EventACK, },