mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
updated procFunc for hello messages
This commit is contained in:
parent
dd07002f91
commit
ea79bced4d
2 changed files with 24 additions and 23 deletions
|
@ -107,4 +107,4 @@ Flag to turn on/off signature verification for all request types.
|
|||
|
||||
### Verification of MethodArgs Signature against ACL
|
||||
|
||||
TODO
|
||||
TODO
|
||||
|
|
45
processes.go
45
processes.go
|
@ -351,31 +351,32 @@ func (s startup) subREQHello(p process) {
|
|||
// a handler are not able to hold state, and we need to hold the state
|
||||
// of the nodes we've received hello's from in the sayHelloNodes map,
|
||||
// which is the information we pass along to generate metrics.
|
||||
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
|
||||
sayHelloNodes := make(map[Node]struct{})
|
||||
proc.procFunc = procFunc(
|
||||
func(ctx context.Context, procFuncCh chan Message) error {
|
||||
sayHelloNodes := make(map[Node]struct{})
|
||||
|
||||
for {
|
||||
// Receive a copy of the message sent from the method handler.
|
||||
var m Message
|
||||
for {
|
||||
// Receive a copy of the message sent from the method handler.
|
||||
var m Message
|
||||
|
||||
select {
|
||||
case m = <-procFuncCh:
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
|
||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add an entry for the node in the map
|
||||
sayHelloNodes[m.FromNode] = struct{}{}
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
|
||||
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||
|
||||
select {
|
||||
case m = <-procFuncCh:
|
||||
case <-ctx.Done():
|
||||
er := fmt.Errorf("info: stopped handleFunc for: subscriber %v", proc.subject.name())
|
||||
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Add an entry for the node in the map
|
||||
sayHelloNodes[m.FromNode] = struct{}{}
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes)))
|
||||
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||
|
||||
}
|
||||
}
|
||||
})
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue