1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

put in a timer to end stuck messages in ringbuffer

This commit is contained in:
postmannen 2022-12-30 16:52:50 +01:00
parent 9e378295df
commit ae6058d0af

View file

@ -293,6 +293,9 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
}, },
} }
ticker := time.NewTicker(time.Duration(v.Data.ACKTimeout) * time.Duration(v.Data.Retries) * 2 * time.Second)
defer ticker.Stop()
outCh <- sd outCh <- sd
// Just to confirm here that the message was picked up, to know if the // Just to confirm here that the message was picked up, to know if the
// the read process have stalled or not. // the read process have stalled or not.
@ -304,7 +307,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// TODO: Check if more logic should be made here if messages are stuck etc. // TODO: Check if more logic should be made here if messages are stuck etc.
// Testing with a timeout here to figure out if messages are stuck // Testing with a timeout here to figure out if messages are stuck
// waiting for done signal. // waiting for done signal.
log.Printf("Error: *** message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID) log.Printf("Error: ringBuffer: message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID)
r.metrics.promRingbufferStalledMessagesTotal.Inc() r.metrics.promRingbufferStalledMessagesTotal.Inc()
} }
@ -312,7 +315,19 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// message will be able to signal back here that the message have // message will be able to signal back here that the message have
// been processed, and that we then can delete it out of the K/V Store. // been processed, and that we then can delete it out of the K/V Store.
<-v.Data.done // The publisAMessage method should send a done back here, but in some situations
// it seems that that do not happen. Implementing a ticker that is twice the total
// amount of time a message should be allowed to be using for getting published so
// we don't get stuck go routines here.
//
// TODO: Figure out why what the reason for not receceving the done signals might be.
select {
case <-v.Data.done:
case <-ticker.C:
log.Printf("----------------------------------------------\n")
log.Printf("Error: ringBuffer message %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.Data.Subject)
log.Printf("----------------------------------------------\n")
}
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) // log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID)) r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))