1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Initial ctx for publishers implemented

This commit is contained in:
postmannen 2021-04-07 18:05:07 +02:00
parent d909784094
commit 12db6fbd6a
3 changed files with 20 additions and 3 deletions

View file

@ -2,7 +2,7 @@
{ {
"directory": "ping", "directory": "ping",
"fileExtension":".ping.log", "fileExtension":".ping.log",
"toNode": "ship1", "toNode": "ship2",
"data": [""], "data": [""],
"method":"REQPing", "method":"REQPing",
"timeout":3, "timeout":3,

Binary file not shown.

View file

@ -2,6 +2,7 @@ package steward
import ( import (
"bytes" "bytes"
"context"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"log" "log"
@ -60,6 +61,10 @@ type process struct {
processes *processes processes *processes
// nats connection // nats connection
natsConn *nats.Conn natsConn *nats.Conn
// context
ctx context.Context
// context cancelFunc
ctxCancel context.CancelFunc
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
@ -74,6 +79,8 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<-
m[a] = struct{}{} m[a] = struct{}{}
} }
ctx, cancel := context.WithCancel(context.Background())
var method Method var method Method
proc := process{ proc := process{
@ -89,6 +96,8 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<-
configuration: configuration, configuration: configuration,
processes: processes, processes: processes,
natsConn: natsConn, natsConn: natsConn,
ctx: ctx,
ctxCancel: cancel,
} }
return proc return proc
@ -366,9 +375,17 @@ func (p process) subscribeMessages() {
func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
for { for {
var err error var err error
var m Message
// Wait and read the next message on the message channel // Wait and read the next message on the message channel, or
m := <-p.subject.messageCh // exit this function if Cancel are received via ctx.
select {
case m = <-p.subject.messageCh:
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
return
}
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
// processes map, and increment the message counter. // processes map, and increment the message counter.