1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-05 20:09:16 +00:00

own req method for REQStopProcess

This commit is contained in:
postmannen 2021-09-20 11:53:17 +02:00
parent 389e263c59
commit b4e60ca3f5
2 changed files with 131 additions and 4 deletions

View file

@ -77,6 +77,13 @@ func (p *processes) Start(proc process) {
go proc.spawnWorker(proc.processes, proc.natsConn)
}
{
log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node)
sub := newSubject(REQOpProcessStop, string(proc.node))
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(proc.processes, proc.natsConn)
}
// Start a subscriber for textLogging messages
if proc.configuration.StartSubREQToFileAppend {
proc.startup.subREQToFileAppend(proc)

View file

@ -44,6 +44,7 @@ import (
"os"
"os/exec"
"path/filepath"
"strconv"
"time"
"github.com/hpcloud/tail"
@ -68,6 +69,8 @@ const (
REQOpProcessList Method = "REQOpProcessList"
// Start up a process.
REQOpProcessStart Method = "REQOpProcessStart"
// Stop up a process.
REQOpProcessStop Method = "REQOpProcessStop"
// Execute a CLI command in for example bash or cmd.
// This is an event type, where a message will be sent to a
// node with the command to execute and an ACK will be replied
@ -144,6 +147,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQOpProcessStart: methodREQOpProcessStart{
commandOrEvent: CommandACK,
},
REQOpProcessStop: methodREQOpProcessStop{
commandOrEvent: CommandACK,
},
REQCliCommand: methodREQCliCommand{
commandOrEvent: CommandACK,
},
@ -553,7 +559,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
out := []byte{}
var out []byte
// We need to create a tempory method type to look up the kind for the
// real method for the message.
@ -573,11 +579,12 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn)
er := fmt.Errorf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
// TODO: How should this look like ?
out = []byte(er.Error())
// TODO: What should this look like ?
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
}()
@ -586,6 +593,119 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
}
// --- OpProcessStop
type methodREQOpProcessStop struct {
commandOrEvent CommandOrEvent
}
func (m methodREQOpProcessStop) getKind() CommandOrEvent {
return m.commandOrEvent
}
// RecevingNode Node `json:"receivingNode"`
// Method Method `json:"method"`
// Kind processKind `json:"kind"`
// ID int `json:"id"`
// Handle Op Process Start
func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) {
proc.processes.wg.Add(1)
go func() {
defer proc.processes.wg.Done()
var out []byte
// We need to create a tempory method type to use to look up the kind for the
// real method for the message.
var mt Method
if v := len(message.MethodArgs); v != 4 {
er := fmt.Errorf("error: OpProcessStop: methodArgs should contain 4 elements, found %v", v)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// --- Parse and check the method arguments given.
fmt.Printf(" * DEBUG : %v\n", message.MethodArgs)
methodString := message.MethodArgs[0]
node := message.MethodArgs[1]
kind := message.MethodArgs[2]
idString := message.MethodArgs[3]
method := Method(methodString)
tmpH := mt.getHandler(Method(method))
if tmpH == nil {
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
return
}
// Check if id is a valid number.
id, err := strconv.Atoi(idString)
if err != nil {
er := fmt.Errorf("error: OpProcessStop: id: %v, is not a number, check that the methodArgs are correct: %v", idString, err)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return
}
// --- Find, and stop process if found
// Based on the arg values received in the message we create a
// processName structure as used in naming the real processes.
// We can then use this processName to get the real values for the
// actual process we want to stop.
sub := newSubject(method, string(node))
fmt.Printf(" * DEBUG : sub: %v\n", sub)
processName := processNameGet(sub.name(), processKind(kind))
fmt.Printf(" * DEBUG : processName: %v\n", processName)
proc.processes.mu.Lock()
// Remove the process from the processes active map if found.
toStopProc, ok := proc.processes.active[processName][id]
if ok {
// Delete the process from the processes map
delete(proc.processes.active, processName)
// Stop started go routines that belong to the process.
toStopProc.ctxCancel()
// Stop subscribing for messages on the process's subject.
err := toStopProc.natsSubscription.Unsubscribe()
if err != nil {
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, message: %v", err, message)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
}
// Remove the prometheus label
proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", id, sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
} else {
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
er := fmt.Errorf(txt)
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
out = []byte(txt + "\n")
newReplyMessage(proc, message, out)
}
proc.processes.mu.Unlock()
}()
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----
type methodREQToFileAppend struct {