1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

OpStop deleting map value, not yet nats subscriber

This commit is contained in:
postmannen 2021-04-08 07:07:13 +02:00
parent a44e003ff5
commit e2f56e4427
4 changed files with 65 additions and 0 deletions

View file

@ -0,0 +1,14 @@
[
{
"directory":"opcommand_logs",
"fileExtension": ".log",
"toNode": "ship2",
"data": ["stopProc","REQHttpGet","subscriber"],
"method":"REQOpCommand",
"timeout":3,
"retries":3,
"requestTimeout":3,
"requestRetries":3,
"MethodTimeout": 7
}
]

View file

@ -61,10 +61,14 @@ type process struct {
processes *processes processes *processes
// nats connection // nats connection
natsConn *nats.Conn natsConn *nats.Conn
// natsSubscription returned when calling natsConn.Subscribe
natsSubscription string
// context // context
ctx context.Context ctx context.Context
// context cancelFunc // context cancelFunc
ctxCancel context.CancelFunc ctxCancel context.CancelFunc
// Process name
processName processName
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
@ -113,6 +117,19 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<-
// can have that wrapped in from when it was constructed. // can have that wrapped in from when it was constructed.
type procFunc func(ctx context.Context) error type procFunc func(ctx context.Context) error
// stop will stop and remove the process from the active processes
// map, and it will send a cancel signal on the ctx to stop all
// running go routines that where started via this process.
func (p process) stop() {
p.processes.mu.Lock()
_, ok := p.processes.active[p.processName]
if ok {
delete(p.processes.active, p.processName)
p.ctxCancel()
}
p.processes.mu.Unlock()
}
// The purpose of this function is to check if we should start a // The purpose of this function is to check if we should start a
// publisher or subscriber process, where a process is a go routine // publisher or subscriber process, where a process is a go routine
// that will handle either sending or receiving messages on one // that will handle either sending or receiving messages on one
@ -132,6 +149,8 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
pn = processNameGet(p.subject.name(), processKindSubscriber) pn = processNameGet(p.subject.name(), processKindSubscriber)
} }
p.processName = pn
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
procs.mu.Lock() procs.mu.Lock()
procs.active[pn] = p procs.active[pn] = p
@ -358,6 +377,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// callback function specified when a new message is received. // callback function specified when a new message is received.
func (p process) subscribeMessages() { func (p process) subscribeMessages() {
subject := string(p.subject.name()) subject := string(p.subject.name())
//natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { _, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here. // We start one handler per message received by using go routines here.

View file

@ -252,6 +252,37 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri
procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil) procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil)
go procNew.spawnWorker(proc.processes, proc.natsConn) go procNew.spawnWorker(proc.processes, proc.natsConn)
case message.Data[0] == "stopProc":
// Data layout: OPCommand, Method, publisher/subscriber
if len(message.Data) < 3 {
er := fmt.Errorf(`error: stopProc: not enough data values. want "<OPCommand>", "<Method>", "<publisher/subscriber>": %v` + fmt.Sprint(message))
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
return
}
// func (s Subject) name() subjectName {
// return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent))''
//
// pn = processNameGet(p.subject.name(), processKindSubscriber)
toStopMethod := Method(message.Data[1])
pubOrSub := processKind(message.Data[2])
// ..check if valid
sub := newSubject(toStopMethod, proc.configuration.NodeName)
processName := processNameGet(sub.name(), pubOrSub)
// ..check if valid
proc.processes.mu.Lock()
toStopProc, ok := proc.processes.active[processName]
if ok {
fmt.Printf(" ** STOP: processName: %v\n", processName)
fmt.Printf(" ** STOP: toStopProc: %v\n", toStopProc)
delete(proc.processes.active, processName)
toStopProc.ctxCancel()
}
proc.processes.mu.Unlock()
default: default:
er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0]) er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0])
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)