diff --git a/example/toShip1-REQPing.json b/example/toShip1-REQPing.json index 8d27797..80bbde1 100644 --- a/example/toShip1-REQPing.json +++ b/example/toShip1-REQPing.json @@ -2,7 +2,7 @@ { "directory": "ping", "fileExtension":".ping.log", - "toNode": "ship1", + "toNode": "ship2", "data": [""], "method":"REQPing", "timeout":3, diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 3e84517..88532d3 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/process.go b/process.go index c20ab95..9a4f395 100644 --- a/process.go +++ b/process.go @@ -2,6 +2,7 @@ package steward import ( "bytes" + "context" "encoding/gob" "fmt" "log" @@ -60,6 +61,10 @@ type process struct { processes *processes // nats connection natsConn *nats.Conn + // context + ctx context.Context + // context cancelFunc + ctxCancel context.CancelFunc } // prepareNewProcess will set the the provided values and the default @@ -74,6 +79,8 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- m[a] = struct{}{} } + ctx, cancel := context.WithCancel(context.Background()) + var method Method proc := process{ @@ -89,6 +96,8 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- configuration: configuration, processes: processes, natsConn: natsConn, + ctx: ctx, + ctxCancel: cancel, } return proc @@ -366,9 +375,17 @@ func (p process) subscribeMessages() { func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { for { var err error + var m Message - // Wait and read the next message on the message channel - m := <-p.subject.messageCh + // Wait and read the next message on the message channel, or + // exit this function if Cancel are received via ctx. + select { + case m = <-p.subject.messageCh: + case <-p.ctx.Done(): + er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + return + } // Get the process name so we can look up the process in the // processes map, and increment the message counter.