From 389e263c59516ad4c9ec32c610b318898000765b Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 20 Sep 2021 06:40:34 +0200 Subject: [PATCH] new req types for proccesslist and processstart --- configuration_flags.go | 4 ++ processes.go | 16 ++++++- requests.go | 100 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 118 insertions(+), 2 deletions(-) diff --git a/configuration_flags.go b/configuration_flags.go index b6c8408..432db83 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -55,6 +55,10 @@ type Configuration struct { ErrorMessageTimeout int // Retries for error messages. ErrorMessageRetries int + + // NOTE: + // Op commands will not be specified as a flag since they can't be turned off. + // Make the current node send hello messages to central at given interval in seconds StartPubREQHello int // Start the central error logger. diff --git a/processes.go b/processes.go index 0ef7104..31c8fc1 100644 --- a/processes.go +++ b/processes.go @@ -55,7 +55,7 @@ func (p *processes) Start(proc process) { // --- Subscriber services that can be started via flags - // Allways start an REQOpCommand subscriber + // Allways start the listeners for Op commands { log.Printf("Starting REQOpCommand subscriber: %#v\n", proc.node) sub := newSubject(REQOpCommand, string(proc.node)) @@ -63,6 +63,20 @@ func (p *processes) Start(proc process) { go proc.spawnWorker(proc.processes, proc.natsConn) } + { + log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node) + sub := newSubject(REQOpProcessList, string(proc.node)) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + go proc.spawnWorker(proc.processes, proc.natsConn) + } + + { + log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node) + sub := newSubject(REQOpProcessStart, string(proc.node)) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + go proc.spawnWorker(proc.processes, proc.natsConn) + } + // Start a subscriber for textLogging messages if proc.configuration.StartSubREQToFileAppend { proc.startup.subREQToFileAppend(proc) diff --git a/requests.go b/requests.go index c6ffc57..86b4ce1 100644 --- a/requests.go +++ b/requests.go @@ -64,6 +64,10 @@ const ( // command to execute shall be given in the data field of the // message as string value. For example "ps". REQOpCommand Method = "REQOpCommand" + // Get a list of all the running processes. + REQOpProcessList Method = "REQOpProcessList" + // Start up a process. + REQOpProcessStart Method = "REQOpProcessStart" // Execute a CLI command in for example bash or cmd. // This is an event type, where a message will be sent to a // node with the command to execute and an ACK will be replied @@ -134,6 +138,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQOpCommand: methodREQOpCommand{ commandOrEvent: CommandACK, }, + REQOpProcessList: methodREQOpProcessList{ + commandOrEvent: CommandACK, + }, + REQOpProcessStart: methodREQOpProcessStart{ + commandOrEvent: CommandACK, + }, REQCliCommand: methodREQCliCommand{ commandOrEvent: CommandACK, }, @@ -488,7 +498,95 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri return ackMsg, nil } -//---- +// ---- New operations + +// --- OpProcessList +type methodREQOpProcessList struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQOpProcessList) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// Handle Op Process List +func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) { + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + out := []byte{} + + proc.processes.mu.Lock() + // Loop the the processes map, and find all that is active to + // be returned in the reply message. + for _, idMap := range proc.processes.active { + for _, v := range idMap { + s := fmt.Sprintf("%v, process: %v, id: %v, name: %v, allowed from: %s\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v.processKind, v.processID, v.subject.name(), v.allowedReceivers) + sb := []byte(s) + out = append(out, sb...) + } + + } + proc.processes.mu.Unlock() + + newReplyMessage(proc, message, out) + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// --- OpProcessStart + +type methodREQOpProcessStart struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQOpProcessStart) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// Handle Op Process Start +func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) { + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + out := []byte{} + + // We need to create a tempory method type to look up the kind for the + // real method for the message. + var mt Method + + m := message.MethodArgs[0] + method := Method(m) + tmpH := mt.getHandler(Method(method)) + if tmpH == nil { + er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + return + } + + // Create the process and start it. + sub := newSubject(method, proc.configuration.NodeName) + procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + go procNew.spawnWorker(proc.processes, proc.natsConn) + + er := fmt.Errorf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + + // TODO: How should this look like ? + out = []byte(er.Error()) + newReplyMessage(proc, message, out) + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil + +} + +// ---- type methodREQToFileAppend struct { commandOrEvent CommandOrEvent