From ea79bced4dee7d1788eec19efb7dd6c96a575517 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 31 Mar 2022 14:54:39 +0200 Subject: [PATCH] updated procFunc for hello messages --- doc/concept/auth/auth.md | 2 +- processes.go | 45 ++++++++++++++++++++-------------------- 2 files changed, 24 insertions(+), 23 deletions(-) diff --git a/doc/concept/auth/auth.md b/doc/concept/auth/auth.md index 5ef588b..5867d9c 100644 --- a/doc/concept/auth/auth.md +++ b/doc/concept/auth/auth.md @@ -107,4 +107,4 @@ Flag to turn on/off signature verification for all request types. ### Verification of MethodArgs Signature against ACL -TODO \ No newline at end of file +TODO diff --git a/processes.go b/processes.go index caf48c8..9bf64e0 100644 --- a/processes.go +++ b/processes.go @@ -351,31 +351,32 @@ func (s startup) subREQHello(p process) { // a handler are not able to hold state, and we need to hold the state // of the nodes we've received hello's from in the sayHelloNodes map, // which is the information we pass along to generate metrics. - proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { - sayHelloNodes := make(map[Node]struct{}) + proc.procFunc = procFunc( + func(ctx context.Context, procFuncCh chan Message) error { + sayHelloNodes := make(map[Node]struct{}) - for { - // Receive a copy of the message sent from the method handler. - var m Message + for { + // Receive a copy of the message sent from the method handler. + var m Message + + select { + case m = <-procFuncCh: + case <-ctx.Done(): + er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + return nil + } + + // Add an entry for the node in the map + sayHelloNodes[m.FromNode] = struct{}{} + + // update the prometheus metrics + s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) + s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() - select { - case m = <-procFuncCh: - case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name()) - // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) - log.Printf("%v\n", er) - return nil } - - // Add an entry for the node in the map - sayHelloNodes[m.FromNode] = struct{}{} - - // update the prometheus metrics - s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) - s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() - - } - } + }) go proc.spawnWorker(p.processes, p.natsConn) }