diff --git a/configuration_flags.go b/configuration_flags.go index 8cb6cbe..6085f55 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -130,6 +130,8 @@ type Configuration struct { StartSubREQHttpGet flagNodeSlice // Subscriber for tailing log files StartSubREQTailFile flagNodeSlice + // Subscriber for continously delivery of output from cli commands. + StartSubREQnCliCommandCont flagNodeSlice } // NewConfiguration will set a default Configuration, @@ -142,30 +144,31 @@ func NewConfiguration() *Configuration { // Default configuration func newConfigurationDefaults() Configuration { c := Configuration{ - ConfigFolder: "/usr/local/steward/etc/", - SocketFolder: "./tmp", - DatabaseFolder: "./var/lib", - BrokerAddress: "127.0.0.1:4222", - ProfilingPort: "", - PromHostAndPort: "", - DefaultMessageTimeout: 10, - DefaultMessageRetries: 1, - StartPubREQHello: 30, - SubscribersDataFolder: "./var", - CentralNodeName: "", - RootCAPath: "", - NkeySeedFile: "", - StartSubREQErrorLog: flagNodeSlice{Values: []Node{}}, - StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + ConfigFolder: "/usr/local/steward/etc/", + SocketFolder: "./tmp", + DatabaseFolder: "./var/lib", + BrokerAddress: "127.0.0.1:4222", + ProfilingPort: "", + PromHostAndPort: "", + DefaultMessageTimeout: 10, + DefaultMessageRetries: 1, + StartPubREQHello: 30, + SubscribersDataFolder: "./var", + CentralNodeName: "", + RootCAPath: "", + NkeySeedFile: "", + StartSubREQErrorLog: flagNodeSlice{Values: []Node{}}, + StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, } return c } @@ -213,6 +216,7 @@ func (c *Configuration) CheckFlags() error { flag.Var(&c.StartSubREQToConsole, "startSubREQToConsole", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQHttpGet, "startSubREQHttpGet", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Var(&c.StartSubREQTailFile, "startSubREQTailFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") + flag.Var(&c.StartSubREQnCliCommandCont, "startSubREQnCliCommandCont", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.") flag.Parse() diff --git a/startup_processes.go b/startup_processes.go index b662b6c..d5d6a00 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -80,6 +80,10 @@ func (p process) ProcessesStart(ctx context.Context) { p.startup.subREQTailFile(p) } + if p.configuration.StartSubREQnCliCommandCont.OK { + p.startup.subREQnCliCommandCont(p) + } + p.startup.subREQToSocket(p) } @@ -255,6 +259,14 @@ func (s startup) subREQTailFile(p process) { go proc.spawnWorker(p.processes, p.natsConn) } +func (s startup) subREQnCliCommandCont(p process) { + log.Printf("Starting cli command with continous delivery: %#v\n", p.node) + sub := newSubject(REQnCliCommandCont, string(p.node)) + proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil) + // fmt.Printf("*** %#v\n", proc) + go proc.spawnWorker(p.processes, p.natsConn) +} + func (s startup) subREQToSocket(p process) { log.Printf("Starting write to socket subscriber: %#v\n", p.node) sub := newSubject(REQToSocket, string(p.node)) diff --git a/steward_test.go b/steward_test.go index 2b2c462..082be8b 100644 --- a/steward_test.go +++ b/steward_test.go @@ -47,13 +47,17 @@ func TestStewardServer(t *testing.T) { DefaultMessageRetries: 1, DefaultMessageTimeout: 3, - StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, - // StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, - StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}}, + StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}}, + // StartSubREQToSocket: flagNodeSlice{OK: true, Values: []Node{"*"}}, } s, err := NewServer(conf) if err != nil { @@ -68,7 +72,7 @@ func TestStewardServer(t *testing.T) { checkREQOpCommandTest(conf, t) checkREQCliCommandTest(conf, t) checkREQnCliCommandTest(conf, t) - // checkREQnCliCommandContTest(conf, t) + checkREQnCliCommandContTest(conf, t) // checkREQToConsoleTest(conf, t), NB: No tests will be made for console ouput. // checkREQToFileAppendTest(conf, t), NB: Already tested via others // checkREQToFileTest(conf, t), NB: Already tested via others @@ -160,6 +164,29 @@ func checkREQnCliCommandTest(conf *Configuration, t *testing.T) { } +// The non-sequential sending of CLI Commands. +func checkREQnCliCommandContTest(conf *Configuration, t *testing.T) { + m := `[ + { + "directory":"commands-executed", + "fileExtension":".result", + "toNode": "central", + "data": ["bash","-c","echo apekatt && sleep 5 && echo gris"], + "replyMethod":"REQToFileAppend", + "method":"REQnCliCommandCont", + "ACKTimeout":3, + "retries":3, + "methodTimeout": 5 + } + ]` + + writeToSocketTest(conf, m, t) + + resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQnCliCommandCont.result") + findStringInFileTest("apekatt", resultFile, conf, t) + +} + // Sending of Hello. func checkREQHelloTest(conf *Configuration, t *testing.T) { m := `[ diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 93805b4..62cbd64 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -1067,9 +1067,10 @@ func (m methodREQnCliCommandCont) getKind() CommandOrEvent { return m.commandOrEvent } -// handler to run a tailing of files with timeout context. The handler will -// return the output of the command run back to the calling publisher -// as a new message. +// Handler to run REQnCliCommandCont, which is the same as normal +// Cli command, but can be used when running a command that will take +// longer time and you want to send the output of the command continually +// back as it is generated, and not just when the command is finished. func (m methodREQnCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)