From e2f56e4427c85acc9575eb1a44213ffe9eac54fc Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 8 Apr 2021 07:07:13 +0200 Subject: [PATCH] OpStop deleting map value, not yet nats subscriber --- ...mmand.json => toShip1-REQOpCommandPs.json} | 0 example/toShip1-REQOpCommandStop.json | 14 +++++++++ process.go | 20 ++++++++++++ subscriber_method_types.go | 31 +++++++++++++++++++ 4 files changed, 65 insertions(+) rename example/{toShip1-REQOpCommand.json => toShip1-REQOpCommandPs.json} (100%) create mode 100644 example/toShip1-REQOpCommandStop.json diff --git a/example/toShip1-REQOpCommand.json b/example/toShip1-REQOpCommandPs.json similarity index 100% rename from example/toShip1-REQOpCommand.json rename to example/toShip1-REQOpCommandPs.json diff --git a/example/toShip1-REQOpCommandStop.json b/example/toShip1-REQOpCommandStop.json new file mode 100644 index 0000000..0a050c4 --- /dev/null +++ b/example/toShip1-REQOpCommandStop.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": ["stopProc","REQHttpGet","subscriber"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/process.go b/process.go index 18e0802..bdbb9f8 100644 --- a/process.go +++ b/process.go @@ -61,10 +61,14 @@ type process struct { processes *processes // nats connection natsConn *nats.Conn + // natsSubscription returned when calling natsConn.Subscribe + natsSubscription string // context ctx context.Context // context cancelFunc ctxCancel context.CancelFunc + // Process name + processName processName } // prepareNewProcess will set the the provided values and the default @@ -113,6 +117,19 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- // can have that wrapped in from when it was constructed. type procFunc func(ctx context.Context) error +// stop will stop and remove the process from the active processes +// map, and it will send a cancel signal on the ctx to stop all +// running go routines that where started via this process. +func (p process) stop() { + p.processes.mu.Lock() + _, ok := p.processes.active[p.processName] + if ok { + delete(p.processes.active, p.processName) + p.ctxCancel() + } + p.processes.mu.Unlock() +} + // The purpose of this function is to check if we should start a // publisher or subscriber process, where a process is a go routine // that will handle either sending or receiving messages on one @@ -132,6 +149,8 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { pn = processNameGet(p.subject.name(), processKindSubscriber) } + p.processName = pn + // Add information about the new process to the started processes map. procs.mu.Lock() procs.active[pn] = p @@ -358,6 +377,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // callback function specified when a new message is received. func (p process) subscribeMessages() { subject := string(p.subject.name()) + //natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { _, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { // We start one handler per message received by using go routines here. diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 0629124..95603be 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -252,6 +252,37 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri procNew := newProcess(proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, allowedPublishers, nil) go procNew.spawnWorker(proc.processes, proc.natsConn) + case message.Data[0] == "stopProc": + // Data layout: OPCommand, Method, publisher/subscriber + if len(message.Data) < 3 { + er := fmt.Errorf(`error: stopProc: not enough data values. want "", "", "": %v` + fmt.Sprint(message)) + sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + return + } + + // func (s Subject) name() subjectName { + // return subjectName(fmt.Sprintf("%s.%s.%s", s.ToNode, s.Method, s.CommandOrEvent))'' + // + // pn = processNameGet(p.subject.name(), processKindSubscriber) + + toStopMethod := Method(message.Data[1]) + pubOrSub := processKind(message.Data[2]) + // ..check if valid + + sub := newSubject(toStopMethod, proc.configuration.NodeName) + processName := processNameGet(sub.name(), pubOrSub) + // ..check if valid + + proc.processes.mu.Lock() + toStopProc, ok := proc.processes.active[processName] + if ok { + fmt.Printf(" ** STOP: processName: %v\n", processName) + fmt.Printf(" ** STOP: toStopProc: %v\n", toStopProc) + delete(proc.processes.active, processName) + toStopProc.ctxCancel() + } + proc.processes.mu.Unlock() + default: er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0]) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)