mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-07 12:59:15 +00:00
205 lines
5.9 KiB
Go
205 lines
5.9 KiB
Go
|
package steward
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/prometheus/client_golang/prometheus"
|
||
|
)
|
||
|
|
||
|
// --- OpProcessList
|
||
|
type methodREQOpProcessList struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQOpProcessList) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// Handle Op Process List
|
||
|
func (m methodREQOpProcessList) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
|
||
|
out := []byte{}
|
||
|
|
||
|
// Loop the the processes map, and find all that is active to
|
||
|
// be returned in the reply message.
|
||
|
|
||
|
proc.processes.active.mu.Lock()
|
||
|
for _, pTmp := range proc.processes.active.procNames {
|
||
|
s := fmt.Sprintf("%v, process: %v, id: %v, name: %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), pTmp.processKind, pTmp.processID, pTmp.subject.name())
|
||
|
sb := []byte(s)
|
||
|
out = append(out, sb...)
|
||
|
|
||
|
}
|
||
|
proc.processes.active.mu.Unlock()
|
||
|
|
||
|
newReplyMessage(proc, message, out)
|
||
|
}()
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
}
|
||
|
|
||
|
// --- OpProcessStart
|
||
|
|
||
|
type methodREQOpProcessStart struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQOpProcessStart) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// Handle Op Process Start
|
||
|
func (m methodREQOpProcessStart) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
var out []byte
|
||
|
|
||
|
// We need to create a tempory method type to look up the kind for the
|
||
|
// real method for the message.
|
||
|
var mt Method
|
||
|
|
||
|
switch {
|
||
|
case len(message.MethodArgs) < 1:
|
||
|
er := fmt.Errorf("error: methodREQOpProcessStart: got <1 number methodArgs")
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
m := message.MethodArgs[0]
|
||
|
method := Method(m)
|
||
|
tmpH := mt.getHandler(Method(method))
|
||
|
if tmpH == nil {
|
||
|
er := fmt.Errorf("error: OpProcessStart: no such request type defined: %v" + m)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Create the process and start it.
|
||
|
sub := newSubject(method, proc.configuration.NodeName)
|
||
|
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil)
|
||
|
go procNew.spawnWorker()
|
||
|
|
||
|
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||
|
er := fmt.Errorf(txt)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
out = []byte(txt + "\n")
|
||
|
newReplyMessage(proc, message, out)
|
||
|
}()
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
|
||
|
}
|
||
|
|
||
|
// --- OpProcessStop
|
||
|
|
||
|
type methodREQOpProcessStop struct {
|
||
|
event Event
|
||
|
}
|
||
|
|
||
|
func (m methodREQOpProcessStop) getKind() Event {
|
||
|
return m.event
|
||
|
}
|
||
|
|
||
|
// RecevingNode Node `json:"receivingNode"`
|
||
|
// Method Method `json:"method"`
|
||
|
// Kind processKind `json:"kind"`
|
||
|
// ID int `json:"id"`
|
||
|
|
||
|
// Handle Op Process Start
|
||
|
func (m methodREQOpProcessStop) handler(proc process, message Message, node string) ([]byte, error) {
|
||
|
proc.processes.wg.Add(1)
|
||
|
go func() {
|
||
|
defer proc.processes.wg.Done()
|
||
|
var out []byte
|
||
|
|
||
|
// We need to create a tempory method type to use to look up the kind for the
|
||
|
// real method for the message.
|
||
|
var mt Method
|
||
|
|
||
|
// --- Parse and check the method arguments given.
|
||
|
// The Reason for also having the node as one of the arguments is
|
||
|
// that publisher processes are named by the node they are sending the
|
||
|
// message to. Subscriber processes names are named by the node name
|
||
|
// they are running on.
|
||
|
|
||
|
if v := len(message.MethodArgs); v != 3 {
|
||
|
er := fmt.Errorf("error: methodREQOpProcessStop: got <4 number methodArgs, want: method,node,kind")
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
methodString := message.MethodArgs[0]
|
||
|
node := message.MethodArgs[1]
|
||
|
kind := message.MethodArgs[2]
|
||
|
|
||
|
method := Method(methodString)
|
||
|
tmpH := mt.getHandler(Method(method))
|
||
|
if tmpH == nil {
|
||
|
er := fmt.Errorf("error: OpProcessStop: no such request type defined: %v, check that the methodArgs are correct: " + methodString)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// --- Find, and stop process if found
|
||
|
|
||
|
// Based on the arg values received in the message we create a
|
||
|
// processName structure as used in naming the real processes.
|
||
|
// We can then use this processName to get the real values for the
|
||
|
// actual process we want to stop.
|
||
|
sub := newSubject(method, string(node))
|
||
|
processName := processNameGet(sub.name(), processKind(kind))
|
||
|
|
||
|
// Remove the process from the processes active map if found.
|
||
|
proc.processes.active.mu.Lock()
|
||
|
toStopProc, ok := proc.processes.active.procNames[processName]
|
||
|
|
||
|
if ok {
|
||
|
// Delete the process from the processes map
|
||
|
delete(proc.processes.active.procNames, processName)
|
||
|
// Stop started go routines that belong to the process.
|
||
|
toStopProc.ctxCancel()
|
||
|
// Stop subscribing for messages on the process's subject.
|
||
|
err := toStopProc.natsSubscription.Unsubscribe()
|
||
|
if err != nil {
|
||
|
er := fmt.Errorf("error: methodREQOpStopProcess failed to stop nats.Subscription: %v, methodArgs: %v", err, message.MethodArgs)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
}
|
||
|
|
||
|
// Remove the prometheus label
|
||
|
proc.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)})
|
||
|
|
||
|
txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode)
|
||
|
er := fmt.Errorf(txt)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
out = []byte(txt + "\n")
|
||
|
newReplyMessage(proc, message, out)
|
||
|
|
||
|
} else {
|
||
|
txt := fmt.Sprintf("error: OpProcessStop: did not find process to stop: %v on %v", sub, message.ToNode)
|
||
|
er := fmt.Errorf(txt)
|
||
|
proc.errorKernel.errSend(proc, message, er)
|
||
|
|
||
|
out = []byte(txt + "\n")
|
||
|
newReplyMessage(proc, message, out)
|
||
|
}
|
||
|
|
||
|
proc.processes.active.mu.Unlock()
|
||
|
|
||
|
}()
|
||
|
|
||
|
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||
|
return ackMsg, nil
|
||
|
|
||
|
}
|
||
|
|
||
|
// ----
|