From 12db6fbd6ab84206677e6a8767675be48bfc7794 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 7 Apr 2021 18:05:07 +0200 Subject: [PATCH] Initial ctx for publishers implemented --- example/toShip1-REQPing.json | 2 +- incommmingBuffer.db | Bin 65536 -> 65536 bytes process.go | 21 +++++++++++++++++++-- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/example/toShip1-REQPing.json b/example/toShip1-REQPing.json index 8d27797..80bbde1 100644 --- a/example/toShip1-REQPing.json +++ b/example/toShip1-REQPing.json @@ -2,7 +2,7 @@ { "directory": "ping", "fileExtension":".ping.log", - "toNode": "ship1", + "toNode": "ship2", "data": [""], "method":"REQPing", "timeout":3, diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 3e84517c50be918fe3835fe143d4468b7f957025..88532d36dc07a07ebee422d07b740a2f1286f02c 100644 GIT binary patch delta 141 zcmZo@U}f1)a${ z4&IEGlPeq?v@NQYJYAHmjEyaHlw1-^5|yl~m5NJ~vQm>vK+L@SlvE`vrQ(dt0;9=~ efx3(~>pNcL+hm|4Fj+?dXvpRYg%18j0UH24#V?5f delta 141 zcmZo@U}Yd|CvbSCRK zcr#jTu5jq!-(;X8Fj+?d$emoF;Gk_@t>o#VWMyn@uA}6VSdyq@RjpK9nv|8ATmoX| d<)@@7St%7~WEL1sehk!QxLIHEBHyBb4FLKUEMfov diff --git a/process.go b/process.go index c20ab95..9a4f395 100644 --- a/process.go +++ b/process.go @@ -2,6 +2,7 @@ package steward import ( "bytes" + "context" "encoding/gob" "fmt" "log" @@ -60,6 +61,10 @@ type process struct { processes *processes // nats connection natsConn *nats.Conn + // context + ctx context.Context + // context cancelFunc + ctxCancel context.CancelFunc } // prepareNewProcess will set the the provided values and the default @@ -74,6 +79,8 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- m[a] = struct{}{} } + ctx, cancel := context.WithCancel(context.Background()) + var method Method proc := process{ @@ -89,6 +96,8 @@ func newProcess(natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- configuration: configuration, processes: processes, natsConn: natsConn, + ctx: ctx, + ctxCancel: cancel, } return proc @@ -366,9 +375,17 @@ func (p process) subscribeMessages() { func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { for { var err error + var m Message - // Wait and read the next message on the message channel - m := <-p.subject.messageCh + // Wait and read the next message on the message channel, or + // exit this function if Cancel are received via ctx. + select { + case m = <-p.subject.messageCh: + case <-p.ctx.Done(): + er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) + sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + return + } // Get the process name so we can look up the process in the // processes map, and increment the message counter.