From ab49f5a4dec12e005ffb70c1e2dcf904c5322932 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 8 Apr 2021 12:51:54 +0200 Subject: [PATCH] op startProc initially seems to work --- example/toShip1-REQHttpGet.json | 2 +- example/toShip1-REQOpCommandPs.json | 2 +- .../toShip1-REQOpCommandStart_REQHttpGet.json | 14 +++++++ .../toShip1-REQOpCommandStop_REQHello.json | 14 +++++++ .../toShip1-REQOpCommandStop_REQHttpGet.json | 14 +++++++ example/toShip1-REQPing.json | 2 +- example/toShip2-REQHttpGet.json | 12 ++++++ example/toShip2-REQOpCommandPs.json | 14 +++++++ ...toShip2-REQOpCommandStart_REQHttpGet.json} | 0 .../toShip2-REQOpCommandStop_REQHello.json | 14 +++++++ ... toShip2-REQOpCommandStop_REQHttpGet.json} | 0 process.go | 39 ++++++++++--------- startup_processes.go | 1 + subscriber_method_types.go | 35 +++++++++++++++-- var/errorLog/central/errorCentral.REQErrorLog | 6 +-- 15 files changed, 141 insertions(+), 28 deletions(-) create mode 100644 example/toShip1-REQOpCommandStart_REQHttpGet.json create mode 100644 example/toShip1-REQOpCommandStop_REQHello.json create mode 100644 example/toShip1-REQOpCommandStop_REQHttpGet.json create mode 100644 example/toShip2-REQHttpGet.json create mode 100644 example/toShip2-REQOpCommandPs.json rename example/{toShip1-REQOpCommandStart.json => toShip2-REQOpCommandStart_REQHttpGet.json} (100%) create mode 100644 example/toShip2-REQOpCommandStop_REQHello.json rename example/{toShip1-REQOpCommandStop.json => toShip2-REQOpCommandStop_REQHttpGet.json} (100%) diff --git a/example/toShip1-REQHttpGet.json b/example/toShip1-REQHttpGet.json index ec3271d..d919119 100644 --- a/example/toShip1-REQHttpGet.json +++ b/example/toShip1-REQHttpGet.json @@ -2,7 +2,7 @@ { "directory": "metrics/network/sniffer", "fileExtension": ".html", - "toNode": "ship2", + "toNode": "ship1", "data": ["http://vg.no"], "method":"REQHttpGet", "timeout":5, diff --git a/example/toShip1-REQOpCommandPs.json b/example/toShip1-REQOpCommandPs.json index d3d61e3..84d3f45 100644 --- a/example/toShip1-REQOpCommandPs.json +++ b/example/toShip1-REQOpCommandPs.json @@ -2,7 +2,7 @@ { "directory":"opcommand_logs", "fileExtension": ".log", - "toNode": "ship2", + "toNode": "ship1", "data": ["ps"], "method":"REQOpCommand", "timeout":3, diff --git a/example/toShip1-REQOpCommandStart_REQHttpGet.json b/example/toShip1-REQOpCommandStart_REQHttpGet.json new file mode 100644 index 0000000..eae62c7 --- /dev/null +++ b/example/toShip1-REQOpCommandStart_REQHttpGet.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship1", + "data": ["startProc","REQHttpGet","central"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip1-REQOpCommandStop_REQHello.json b/example/toShip1-REQOpCommandStop_REQHello.json new file mode 100644 index 0000000..0138fbe --- /dev/null +++ b/example/toShip1-REQOpCommandStop_REQHello.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship1", + "data": ["stopProc","REQHello","publisher","central"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip1-REQOpCommandStop_REQHttpGet.json b/example/toShip1-REQOpCommandStop_REQHttpGet.json new file mode 100644 index 0000000..c3310fe --- /dev/null +++ b/example/toShip1-REQOpCommandStop_REQHttpGet.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship1", + "data": ["stopProc","REQHttpGet","subscriber"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip1-REQPing.json b/example/toShip1-REQPing.json index 80bbde1..8d27797 100644 --- a/example/toShip1-REQPing.json +++ b/example/toShip1-REQPing.json @@ -2,7 +2,7 @@ { "directory": "ping", "fileExtension":".ping.log", - "toNode": "ship2", + "toNode": "ship1", "data": [""], "method":"REQPing", "timeout":3, diff --git a/example/toShip2-REQHttpGet.json b/example/toShip2-REQHttpGet.json new file mode 100644 index 0000000..874b3fc --- /dev/null +++ b/example/toShip2-REQHttpGet.json @@ -0,0 +1,12 @@ +[ + { + "directory": "metrics/network/sniffer", + "fileExtension": ".html", + "toNode": "ship2", + "data": ["http://erter.org"], + "method":"REQHttpGet", + "timeout":5, + "retries":3, + "methodTimeout": 5 + } +] \ No newline at end of file diff --git a/example/toShip2-REQOpCommandPs.json b/example/toShip2-REQOpCommandPs.json new file mode 100644 index 0000000..d3d61e3 --- /dev/null +++ b/example/toShip2-REQOpCommandPs.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": ["ps"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip1-REQOpCommandStart.json b/example/toShip2-REQOpCommandStart_REQHttpGet.json similarity index 100% rename from example/toShip1-REQOpCommandStart.json rename to example/toShip2-REQOpCommandStart_REQHttpGet.json diff --git a/example/toShip2-REQOpCommandStop_REQHello.json b/example/toShip2-REQOpCommandStop_REQHello.json new file mode 100644 index 0000000..f90a2ce --- /dev/null +++ b/example/toShip2-REQOpCommandStop_REQHello.json @@ -0,0 +1,14 @@ +[ + { + "directory":"opcommand_logs", + "fileExtension": ".log", + "toNode": "ship2", + "data": ["stopProc","REQHello","publisher"], + "method":"REQOpCommand", + "timeout":3, + "retries":3, + "requestTimeout":3, + "requestRetries":3, + "MethodTimeout": 7 + } +] \ No newline at end of file diff --git a/example/toShip1-REQOpCommandStop.json b/example/toShip2-REQOpCommandStop_REQHttpGet.json similarity index 100% rename from example/toShip1-REQOpCommandStop.json rename to example/toShip2-REQOpCommandStop_REQHttpGet.json diff --git a/process.go b/process.go index bdbb9f8..1332d3f 100644 --- a/process.go +++ b/process.go @@ -62,7 +62,7 @@ type process struct { // nats connection natsConn *nats.Conn // natsSubscription returned when calling natsConn.Subscribe - natsSubscription string + natsSubscription *nats.Subscription // context ctx context.Context // context cancelFunc @@ -149,13 +149,6 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { pn = processNameGet(p.subject.name(), processKindSubscriber) } - p.processName = pn - - // Add information about the new process to the started processes map. - procs.mu.Lock() - procs.active[pn] = p - procs.mu.Unlock() - // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindPublisher { @@ -174,7 +167,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { }() } - go p.publishMessages(natsConn, procs) + go p.publishMessages(natsConn) } // Start a subscriber worker, which will start a go routine (process) @@ -194,8 +187,15 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { }() } - p.subscribeMessages() + p.natsSubscription = p.subscribeMessages() } + + p.processName = pn + + // Add information about the new process to the started processes map. + procs.mu.Lock() + procs.active[pn] = p + procs.mu.Unlock() } // messageDeliverNats will take care of the delivering the message @@ -375,10 +375,10 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. -func (p process) subscribeMessages() { +func (p process) subscribeMessages() *nats.Subscription { subject := string(p.subject.name()) - //natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { - _, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { + natsSubscription, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { + //_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) { // We start one handler per message received by using go routines here. // This is for being able to reply back the current publisher who sent @@ -387,12 +387,15 @@ func (p process) subscribeMessages() { }) if err != nil { log.Printf("error: Subscribe failed: %v\n", err) + return nil } + + return natsSubscription } // publishMessages will do the publishing of messages for one single // process. -func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { +func (p process) publishMessages(natsConn *nats.Conn) { for { var err error var m Message @@ -410,7 +413,7 @@ func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { // Get the process name so we can look up the process in the // processes map, and increment the message counter. pn := processNameGet(p.subject.name(), processKindPublisher) - m.ID = processes.active[pn].messageID + m.ID = p.messageID p.messageDeliverNats(natsConn, m) @@ -420,9 +423,9 @@ func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { // Increment the counter for the next message to be sent. p.messageID++ - processes.mu.Lock() - processes.active[pn] = p - processes.mu.Unlock() + p.processes.mu.Lock() + p.processes.active[pn] = p + p.processes.mu.Unlock() // Handle the error. // diff --git a/startup_processes.go b/startup_processes.go index c020068..f815831 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -184,6 +184,7 @@ func (s *server) ProcessesStart() { select { case <-ticker.C: case <-ctx.Done(): + fmt.Printf(" ** DEBUG: got <- ctx.Done\n") er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) return nil diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 95603be..b9155be 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -253,8 +253,18 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri go procNew.spawnWorker(proc.processes, proc.natsConn) case message.Data[0] == "stopProc": - // Data layout: OPCommand, Method, publisher/subscriber - if len(message.Data) < 3 { + fmt.Printf(" ** DEBUG 0: got stopProc\n") + // Data layout: OPCommand, Method, publisher/subscriber, receivingNode + // + // The processes can be either publishers or subscribers. The subject name + // are used within naming a process. Since the subject structure contains + // the node name of the node that will receive this message we also need + // specify it so we are able to delete the publisher processes, since a + // publisher process will have the name of the node to receive the message, + // and not just the local node name as with subscriber processes. + // receive the message we need to specify + if len(message.Data) < 4 { + fmt.Printf(`error: DEBUG: stopProc: not enough data values. want "", "", "","": %v` + fmt.Sprint(message)) er := fmt.Errorf(`error: stopProc: not enough data values. want "", "", "": %v` + fmt.Sprint(message)) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) return @@ -267,23 +277,42 @@ func (m methodREQOpCommand) handler(proc process, message Message, nodeName stri toStopMethod := Method(message.Data[1]) pubOrSub := processKind(message.Data[2]) + recevingNode := processKind(message.Data[3]) // ..check if valid - sub := newSubject(toStopMethod, proc.configuration.NodeName) + sub := newSubject(toStopMethod, string(recevingNode)) processName := processNameGet(sub.name(), pubOrSub) // ..check if valid + fmt.Printf(" ** DEBUG1: processName: %v\n", processName) + fmt.Printf(" ** DEBUG1.1: Before mutex lock\n") proc.processes.mu.Lock() + fmt.Printf(" ** DEBUG1.2: After mutex lock\n") + + for k, v := range proc.processes.active { + fmt.Printf(" ** DEBUG1.3: MAP: k = %v, v = %v\n", k, v.processKind) + } + toStopProc, ok := proc.processes.active[processName] + fmt.Printf(" ** DEBUG2.1: toStopProc: %v\n", toStopProc) if ok { + fmt.Printf(" ** DEBUG2.2: toStopProc: %v\n", toStopProc) fmt.Printf(" ** STOP: processName: %v\n", processName) fmt.Printf(" ** STOP: toStopProc: %v\n", toStopProc) + // Delete the process from the processes map delete(proc.processes.active, processName) + // Stop started go routines that belong to the process. toStopProc.ctxCancel() + // Stop subscribing for messages on the process's subject. + err := toStopProc.natsSubscription.Unsubscribe() + if err != nil { + log.Printf(" ** Error: failed to stop *nats.Subscription: %v\n", err) + } } proc.processes.mu.Unlock() default: + fmt.Printf("error: no such OpCommand specified: " + message.Data[0]) er := fmt.Errorf("error: no such OpCommand specified: " + message.Data[0]) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) return diff --git a/var/errorLog/central/errorCentral.REQErrorLog b/var/errorLog/central/errorCentral.REQErrorLog index 8349359..c8ac95b 100644 --- a/var/errorLog/central/errorCentral.REQErrorLog +++ b/var/errorLog/central/errorCentral.REQErrorLog @@ -1,4 +1,2 @@ -2021-04-07 14:14:13.750212 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout -2021-04-07 14:14:18.755501 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout -2021-04-07 14:14:23.760201 +0000 UTC, error: subReply.NextMsg failed for node=ship2, subject=ship2.REQHttpGet.EventACK: nats: timeout -2021-04-07 14:14:23.760319 +0000 UTC, info: max retries for message reached, breaking out: {ship2 5 [http://vg.no] REQHttpGet central 5 3 0 0 5 metrics/network/sniffer .html 0xc00042e240} +2021-04-08 10:18:45.748974 +0000 UTC, info: canceling publisher: central.REQHello.EventNACK +2021-04-08 10:18:46.252822 +0000 UTC, error: subscriberHandler: failed to execute event: open var/errorLog/central/errorCentral.REQErrorLog: permission denied