From ae6058d0afd55cc3b842cc93c54d5c1d4add42a5 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 30 Dec 2022 16:52:50 +0100 Subject: [PATCH] put in a timer to end stuck messages in ringbuffer --- ringbuffer.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/ringbuffer.go b/ringbuffer.go index f054b90..9550867 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -293,6 +293,9 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB }, } + ticker := time.NewTicker(time.Duration(v.Data.ACKTimeout) * time.Duration(v.Data.Retries) * 2 * time.Second) + defer ticker.Stop() + outCh <- sd // Just to confirm here that the message was picked up, to know if the // the read process have stalled or not. @@ -304,7 +307,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // TODO: Check if more logic should be made here if messages are stuck etc. // Testing with a timeout here to figure out if messages are stuck // waiting for done signal. - log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID) + log.Printf("Error: ringBuffer: message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID) r.metrics.promRingbufferStalledMessagesTotal.Inc() } @@ -312,7 +315,19 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // message will be able to signal back here that the message have // been processed, and that we then can delete it out of the K/V Store. - <-v.Data.done + // The publisAMessage method should send a done back here, but in some situations + // it seems that that do not happen. Implementing a ticker that is twice the total + // amount of time a message should be allowed to be using for getting published so + // we don't get stuck go routines here. + // + // TODO: Figure out why what the reason for not receceving the done signals might be. + select { + case <-v.Data.done: + case <-ticker.C: + log.Printf("----------------------------------------------\n") + log.Printf("Error: ringBuffer message %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.Data.Subject) + log.Printf("----------------------------------------------\n") + } // log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))