mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
added startup and test for REQnCliCommandCont
This commit is contained in:
parent
e729ea3d19
commit
c219a9e7c7
4 changed files with 79 additions and 35 deletions
|
@ -130,6 +130,8 @@ type Configuration struct {
|
||||||
StartSubREQHttpGet flagNodeSlice
|
StartSubREQHttpGet flagNodeSlice
|
||||||
// Subscriber for tailing log files
|
// Subscriber for tailing log files
|
||||||
StartSubREQTailFile flagNodeSlice
|
StartSubREQTailFile flagNodeSlice
|
||||||
|
// Subscriber for continously delivery of output from cli commands.
|
||||||
|
StartSubREQnCliCommandCont flagNodeSlice
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfiguration will set a default Configuration,
|
// NewConfiguration will set a default Configuration,
|
||||||
|
@ -142,30 +144,31 @@ func NewConfiguration() *Configuration {
|
||||||
// Default configuration
|
// Default configuration
|
||||||
func newConfigurationDefaults() Configuration {
|
func newConfigurationDefaults() Configuration {
|
||||||
c := Configuration{
|
c := Configuration{
|
||||||
ConfigFolder: "/usr/local/steward/etc/",
|
ConfigFolder: "/usr/local/steward/etc/",
|
||||||
SocketFolder: "./tmp",
|
SocketFolder: "./tmp",
|
||||||
DatabaseFolder: "./var/lib",
|
DatabaseFolder: "./var/lib",
|
||||||
BrokerAddress: "127.0.0.1:4222",
|
BrokerAddress: "127.0.0.1:4222",
|
||||||
ProfilingPort: "",
|
ProfilingPort: "",
|
||||||
PromHostAndPort: "",
|
PromHostAndPort: "",
|
||||||
DefaultMessageTimeout: 10,
|
DefaultMessageTimeout: 10,
|
||||||
DefaultMessageRetries: 1,
|
DefaultMessageRetries: 1,
|
||||||
StartPubREQHello: 30,
|
StartPubREQHello: 30,
|
||||||
SubscribersDataFolder: "./var",
|
SubscribersDataFolder: "./var",
|
||||||
CentralNodeName: "",
|
CentralNodeName: "",
|
||||||
RootCAPath: "",
|
RootCAPath: "",
|
||||||
NkeySeedFile: "",
|
NkeySeedFile: "",
|
||||||
StartSubREQErrorLog: flagNodeSlice{Values: []Node{}},
|
StartSubREQErrorLog: flagNodeSlice{Values: []Node{}},
|
||||||
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQPing: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQPong: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
|
StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
@ -213,6 +216,7 @@ func (c *Configuration) CheckFlags() error {
|
||||||
flag.Var(&c.StartSubREQToConsole, "startSubREQToConsole", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQToConsole, "startSubREQToConsole", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQHttpGet, "startSubREQHttpGet", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQHttpGet, "startSubREQHttpGet", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
flag.Var(&c.StartSubREQTailFile, "startSubREQTailFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
flag.Var(&c.StartSubREQTailFile, "startSubREQTailFile", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
|
flag.Var(&c.StartSubREQnCliCommandCont, "startSubREQnCliCommandCont", "Specify comma separated list for nodes to allow messages from. Use \"*\" for from all. Value RST will turn off subscriber.")
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
|
|
@ -80,6 +80,10 @@ func (p process) ProcessesStart(ctx context.Context) {
|
||||||
p.startup.subREQTailFile(p)
|
p.startup.subREQTailFile(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if p.configuration.StartSubREQnCliCommandCont.OK {
|
||||||
|
p.startup.subREQnCliCommandCont(p)
|
||||||
|
}
|
||||||
|
|
||||||
p.startup.subREQToSocket(p)
|
p.startup.subREQToSocket(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,6 +259,14 @@ func (s startup) subREQTailFile(p process) {
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s startup) subREQnCliCommandCont(p process) {
|
||||||
|
log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
|
||||||
|
sub := newSubject(REQnCliCommandCont, string(p.node))
|
||||||
|
proc := newProcess(p.ctx, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, p.configuration.StartSubREQTailFile.Values, nil)
|
||||||
|
// fmt.Printf("*** %#v\n", proc)
|
||||||
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
|
}
|
||||||
|
|
||||||
func (s startup) subREQToSocket(p process) {
|
func (s startup) subREQToSocket(p process) {
|
||||||
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToSocket, string(p.node))
|
sub := newSubject(REQToSocket, string(p.node))
|
||||||
|
|
|
@ -47,13 +47,17 @@ func TestStewardServer(t *testing.T) {
|
||||||
DefaultMessageRetries: 1,
|
DefaultMessageRetries: 1,
|
||||||
DefaultMessageTimeout: 3,
|
DefaultMessageTimeout: 3,
|
||||||
|
|
||||||
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQnCliCommand: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
// StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQnCliCommandCont: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToConsole: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQToFileAppend: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQToFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
StartSubREQHello: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
|
StartSubREQErrorLog: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
|
StartSubREQHttpGet: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
|
StartSubREQTailFile: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
|
// StartSubREQToSocket: flagNodeSlice{OK: true, Values: []Node{"*"}},
|
||||||
}
|
}
|
||||||
s, err := NewServer(conf)
|
s, err := NewServer(conf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -68,7 +72,7 @@ func TestStewardServer(t *testing.T) {
|
||||||
checkREQOpCommandTest(conf, t)
|
checkREQOpCommandTest(conf, t)
|
||||||
checkREQCliCommandTest(conf, t)
|
checkREQCliCommandTest(conf, t)
|
||||||
checkREQnCliCommandTest(conf, t)
|
checkREQnCliCommandTest(conf, t)
|
||||||
// checkREQnCliCommandContTest(conf, t)
|
checkREQnCliCommandContTest(conf, t)
|
||||||
// checkREQToConsoleTest(conf, t), NB: No tests will be made for console ouput.
|
// checkREQToConsoleTest(conf, t), NB: No tests will be made for console ouput.
|
||||||
// checkREQToFileAppendTest(conf, t), NB: Already tested via others
|
// checkREQToFileAppendTest(conf, t), NB: Already tested via others
|
||||||
// checkREQToFileTest(conf, t), NB: Already tested via others
|
// checkREQToFileTest(conf, t), NB: Already tested via others
|
||||||
|
@ -160,6 +164,29 @@ func checkREQnCliCommandTest(conf *Configuration, t *testing.T) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The non-sequential sending of CLI Commands.
|
||||||
|
func checkREQnCliCommandContTest(conf *Configuration, t *testing.T) {
|
||||||
|
m := `[
|
||||||
|
{
|
||||||
|
"directory":"commands-executed",
|
||||||
|
"fileExtension":".result",
|
||||||
|
"toNode": "central",
|
||||||
|
"data": ["bash","-c","echo apekatt && sleep 5 && echo gris"],
|
||||||
|
"replyMethod":"REQToFileAppend",
|
||||||
|
"method":"REQnCliCommandCont",
|
||||||
|
"ACKTimeout":3,
|
||||||
|
"retries":3,
|
||||||
|
"methodTimeout": 5
|
||||||
|
}
|
||||||
|
]`
|
||||||
|
|
||||||
|
writeToSocketTest(conf, m, t)
|
||||||
|
|
||||||
|
resultFile := filepath.Join(conf.SubscribersDataFolder, "commands-executed", "central", "central.REQnCliCommandCont.result")
|
||||||
|
findStringInFileTest("apekatt", resultFile, conf, t)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
// Sending of Hello.
|
// Sending of Hello.
|
||||||
func checkREQHelloTest(conf *Configuration, t *testing.T) {
|
func checkREQHelloTest(conf *Configuration, t *testing.T) {
|
||||||
m := `[
|
m := `[
|
||||||
|
|
|
@ -1067,9 +1067,10 @@ func (m methodREQnCliCommandCont) getKind() CommandOrEvent {
|
||||||
return m.commandOrEvent
|
return m.commandOrEvent
|
||||||
}
|
}
|
||||||
|
|
||||||
// handler to run a tailing of files with timeout context. The handler will
|
// Handler to run REQnCliCommandCont, which is the same as normal
|
||||||
// return the output of the command run back to the calling publisher
|
// Cli command, but can be used when running a command that will take
|
||||||
// as a new message.
|
// longer time and you want to send the output of the command continually
|
||||||
|
// back as it is generated, and not just when the command is finished.
|
||||||
func (m methodREQnCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
|
func (m methodREQnCliCommandCont) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
log.Printf("<--- CLInCommandCont REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue