From b4e60ca3f537acf3ec3d8614c21cef6fc3d6fbae Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 20 Sep 2021 11:53:17 +0200 Subject: [PATCH] own req method for REQStopProcess --- processes.go | 7 +++ requests.go | 128 +++++++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 131 insertions(+), 4 deletions(-) diff --git a/processes.go b/processes.go index 31c8fc1..a30662e 100644 --- a/processes.go +++ b/processes.go @@ -77,6 +77,13 @@ func (p *processes) Start(proc process) { go proc.spawnWorker(proc.processes, proc.natsConn) } + { + log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node) + sub := newSubject(REQOpProcessStop, string(proc.node)) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + go proc.spawnWorker(proc.processes, proc.natsConn) + } + // Start a subscriber for textLogging messages if proc.configuration.StartSubREQToFileAppend { proc.startup.subREQToFileAppend(proc) diff --git a/requests.go b/requests.go index 86b4ce1..4dba049 100644 --- a/requests.go +++ b/requests.go @@ -44,6 +44,7 @@ import ( "os" "os/exec" "path/filepath" + "strconv" "time" "github.com/hpcloud/tail" @@ -68,6 +69,8 @@ const ( REQOpProcessList Method = "REQOpProcessList" // Start up a process. REQOpProcessStart Method = "REQOpProcessStart" + // Stop up a process. + REQOpProcessStop Method = "REQOpProcessStop" // Execute a CLI command in for example bash or cmd. // This is an event type, where a message will be sent to a // node with the command to execute and an ACK will be replied @@ -144,6 +147,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQOpProcessStart: methodREQOpProcessStart{ commandOrEvent: CommandACK, }, + REQOpProcessStop: methodREQOpProcessStop{ + commandOrEvent: CommandACK, + }, REQCliCommand: methodREQCliCommand{ commandOrEvent: CommandACK, }, @@ -553,7 +559,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str proc.processes.wg.Add(1) go func() { defer proc.processes.wg.Done() - out := []byte{} + var out []byte // We need to create a tempory method type to look up the kind for the // real method for the message. @@ -573,11 +579,12 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) - er := fmt.Errorf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) + txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) + er := fmt.Errorf(txt) sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) - // TODO: How should this look like ? - out = []byte(er.Error()) + // TODO: What should this look like ? + out = []byte(txt + "\n") newReplyMessage(proc, message, out) }() @@ -586,6 +593,119 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str } +// --- OpProcessStop + +type methodREQOpProcessStop struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQOpProcessStop) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// RecevingNode Node `json:"receivingNode"` +// Method Method `json:"method"` +// Kind processKind `json:"kind"` +// ID int `json:"id"` + +// Handle Op Process Start +func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) { + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + var out []byte + + // We need to create a tempory method type to use to look up the kind for the + // real method for the message. + var mt Method + + if v := len(message.MethodArgs); v != 4 { + er := fmt.Errorf("error: OpProcessStop: methodArgs should contain 4 elements, found %v", v) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + return + } + + // --- Parse and check the method arguments given. + fmt.Printf(" * DEBUG : %v\n", message.MethodArgs) + methodString := message.MethodArgs[0] + node := message.MethodArgs[1] + kind := message.MethodArgs[2] + idString := message.MethodArgs[3] + + method := Method(methodString) + tmpH := mt.getHandler(Method(method)) + if tmpH == nil { + er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + return + } + + // Check if id is a valid number. + id, err := strconv.Atoi(idString) + if err != nil { + er := fmt.Errorf("error: OpProcessStop: id: %v, is not a number, check that the methodArgs are correct: %v", idString, err) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + return + } + + // --- Find, and stop process if found + + // Based on the arg values received in the message we create a + // processName structure as used in naming the real processes. + // We can then use this processName to get the real values for the + // actual process we want to stop. + sub := newSubject(method, string(node)) + fmt.Printf(" * DEBUG : sub: %v\n", sub) + processName := processNameGet(sub.name(), processKind(kind)) + fmt.Printf(" * DEBUG : processName: %v\n", processName) + + proc.processes.mu.Lock() + + // Remove the process from the processes active map if found. + toStopProc, ok := proc.processes.active[processName][id] + if ok { + // Delete the process from the processes map + delete(proc.processes.active, processName) + // Stop started go routines that belong to the process. + toStopProc.ctxCancel() + // Stop subscribing for messages on the process's subject. + err := toStopProc.natsSubscription.Unsubscribe() + if err != nil { + er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, message: %v", err, message) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + } + + // Remove the prometheus label + proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) + + txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", id, sub, message.ToNode) + er := fmt.Errorf(txt) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + + out = []byte(txt + "\n") + newReplyMessage(proc, message, out) + + } else { + txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode) + er := fmt.Errorf(txt) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + + out = []byte(txt + "\n") + newReplyMessage(proc, message, out) + } + + proc.processes.mu.Unlock() + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil + +} + // ---- type methodREQToFileAppend struct {