mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 10:57:42 +00:00
added REQToFileNACK
This commit is contained in:
parent
abf612f5e5
commit
8585086cda
3 changed files with 28 additions and 0 deletions
|
@ -94,6 +94,8 @@ type Configuration struct {
|
||||||
StartSubREQToFileAppend bool
|
StartSubREQToFileAppend bool
|
||||||
// Subscriber for writing to file
|
// Subscriber for writing to file
|
||||||
StartSubREQToFile bool
|
StartSubREQToFile bool
|
||||||
|
// Subscriber for writing to file without ACK
|
||||||
|
StartSubREQToFileNACK bool
|
||||||
// Subscriber for reading files to copy
|
// Subscriber for reading files to copy
|
||||||
StartSubREQCopyFileFrom bool
|
StartSubREQCopyFileFrom bool
|
||||||
// Subscriber for writing copied files to disk
|
// Subscriber for writing copied files to disk
|
||||||
|
@ -161,6 +163,7 @@ type ConfigurationFromFile struct {
|
||||||
StartSubREQHello *bool
|
StartSubREQHello *bool
|
||||||
StartSubREQToFileAppend *bool
|
StartSubREQToFileAppend *bool
|
||||||
StartSubREQToFile *bool
|
StartSubREQToFile *bool
|
||||||
|
StartSubREQToFileNACK *bool
|
||||||
StartSubREQCopyFileFrom *bool
|
StartSubREQCopyFileFrom *bool
|
||||||
StartSubREQCopyFileTo *bool
|
StartSubREQCopyFileTo *bool
|
||||||
StartSubREQPing *bool
|
StartSubREQPing *bool
|
||||||
|
@ -221,6 +224,7 @@ func newConfigurationDefaults() Configuration {
|
||||||
StartSubREQHello: true,
|
StartSubREQHello: true,
|
||||||
StartSubREQToFileAppend: true,
|
StartSubREQToFileAppend: true,
|
||||||
StartSubREQToFile: true,
|
StartSubREQToFile: true,
|
||||||
|
StartSubREQToFileNACK: true,
|
||||||
StartSubREQCopyFileFrom: true,
|
StartSubREQCopyFileFrom: true,
|
||||||
StartSubREQCopyFileTo: true,
|
StartSubREQCopyFileTo: true,
|
||||||
StartSubREQPing: true,
|
StartSubREQPing: true,
|
||||||
|
@ -430,6 +434,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
||||||
} else {
|
} else {
|
||||||
conf.StartSubREQToFile = *cf.StartSubREQToFile
|
conf.StartSubREQToFile = *cf.StartSubREQToFile
|
||||||
}
|
}
|
||||||
|
if cf.StartSubREQToFileNACK == nil {
|
||||||
|
conf.StartSubREQToFileNACK = cd.StartSubREQToFileNACK
|
||||||
|
} else {
|
||||||
|
conf.StartSubREQToFileNACK = *cf.StartSubREQToFileNACK
|
||||||
|
}
|
||||||
if cf.StartSubREQCopyFileFrom == nil {
|
if cf.StartSubREQCopyFileFrom == nil {
|
||||||
conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom
|
conf.StartSubREQCopyFileFrom = cd.StartSubREQCopyFileFrom
|
||||||
} else {
|
} else {
|
||||||
|
@ -556,6 +565,7 @@ func (c *Configuration) CheckFlags() error {
|
||||||
flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false")
|
flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
|
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false")
|
flag.BoolVar(&c.StartSubREQToFile, "startSubREQToFile", fc.StartSubREQToFile, "true/false")
|
||||||
|
flag.BoolVar(&c.StartSubREQToFileNACK, "startSubREQToFileNACK", fc.StartSubREQToFileNACK, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false")
|
flag.BoolVar(&c.StartSubREQCopyFileFrom, "startSubREQCopyFileFrom", fc.StartSubREQCopyFileFrom, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false")
|
flag.BoolVar(&c.StartSubREQCopyFileTo, "startSubREQCopyFileTo", fc.StartSubREQCopyFileTo, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false")
|
flag.BoolVar(&c.StartSubREQPing, "startSubREQPing", fc.StartSubREQPing, "true/false")
|
||||||
|
|
13
processes.go
13
processes.go
|
@ -122,6 +122,11 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.subREQToFile(proc)
|
proc.startup.subREQToFile(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Start a subscriber for text to file messages
|
||||||
|
if proc.configuration.StartSubREQToFileNACK {
|
||||||
|
proc.startup.subREQToFileNACK(proc)
|
||||||
|
}
|
||||||
|
|
||||||
// Start a subscriber for reading file to copy
|
// Start a subscriber for reading file to copy
|
||||||
if proc.configuration.StartSubREQCopyFileFrom {
|
if proc.configuration.StartSubREQCopyFileFrom {
|
||||||
proc.startup.subREQCopyFileFrom(proc)
|
proc.startup.subREQCopyFileFrom(proc)
|
||||||
|
@ -382,6 +387,14 @@ func (s startup) subREQToFile(p process) {
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s startup) subREQToFileNACK(p process) {
|
||||||
|
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
||||||
|
sub := newSubject(REQToFileNACK, string(p.node))
|
||||||
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
|
}
|
||||||
|
|
||||||
func (s startup) subREQCopyFileFrom(p process) {
|
func (s startup) subREQCopyFileFrom(p process) {
|
||||||
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCopyFileFrom, string(p.node))
|
sub := newSubject(REQCopyFileFrom, string(p.node))
|
||||||
|
|
|
@ -102,6 +102,8 @@ const (
|
||||||
// The data field is a slice of strings where the values of the
|
// The data field is a slice of strings where the values of the
|
||||||
// slice will be written to the file.
|
// slice will be written to the file.
|
||||||
REQToFile Method = "REQToFile"
|
REQToFile Method = "REQToFile"
|
||||||
|
// REQToFileNACK same as REQToFile but NACK.
|
||||||
|
REQToFileNACK Method = "REQToFileNACK"
|
||||||
// Read the source file to be copied to some node.
|
// Read the source file to be copied to some node.
|
||||||
REQCopyFileFrom Method = "REQCopyFileFrom"
|
REQCopyFileFrom Method = "REQCopyFileFrom"
|
||||||
// Write the destination copied to some node.
|
// Write the destination copied to some node.
|
||||||
|
@ -175,6 +177,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
REQToFile: methodREQToFile{
|
REQToFile: methodREQToFile{
|
||||||
event: EventACK,
|
event: EventACK,
|
||||||
},
|
},
|
||||||
|
REQToFileNACK: methodREQToFile{
|
||||||
|
event: EventNACK,
|
||||||
|
},
|
||||||
REQCopyFileFrom: methodREQCopyFileFrom{
|
REQCopyFileFrom: methodREQCopyFileFrom{
|
||||||
event: EventACK,
|
event: EventACK,
|
||||||
},
|
},
|
||||||
|
|
Loading…
Add table
Reference in a new issue