diff --git a/process.go b/process.go index 6e6eef0..eff5317 100644 --- a/process.go +++ b/process.go @@ -1005,13 +1005,15 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, // sending of the message. p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) - // select { - // case m.done <- struct{}{}: - // // Signaling back to the ringbuffer that we are done with the - // // current message, and it can remove it from the ringbuffer. - // case <-p.ctx.Done(): - // return - // } + if p.configuration.RingBufferPersistStore { + select { + case m.done <- struct{}{}: + // Signaling back to the ringbuffer that we are done with the + // current message, and it can remove it from the ringbuffer. + case <-p.ctx.Done(): + return + } + } // Increment the counter for the next message to be sent. p.messageID++ diff --git a/ringbuffer.go b/ringbuffer.go index 6452517..f051a10 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -281,6 +281,12 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh }, } + // Create a ticker that will kick in when a message have been in the + // system for it's maximum time. This will allow us to continue, and + // remove the message if it takes longer than it should to get delivered. + ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * time.Second) + defer ticker.Stop() + ringBufferOutCh <- sd // Just to confirm here that the message was picked up, to know if the // the read process have stalled or not. @@ -305,14 +311,16 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh // amount of time a message should be allowed to be using for getting published so // we don't get stuck go routines here. // - // TODO: Figure out why what the reason for not receceving the done signals might be. - // select { - // case <-v.SAM.done: - // case <-ticker.C: - // log.Printf("----------------------------------------------\n") - // log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject) - // log.Printf("----------------------------------------------\n") - // } + if r.configuration.RingBufferPersistStore { + select { + case <-v.SAM.done: + fmt.Printf("---\n DONE with\n---\n") + case <-ticker.C: + log.Printf("----------------------------------------------\n") + log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject) + log.Printf("----------------------------------------------\n") + } + } // log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID)) diff --git a/scripts/clifolder.sh b/scripts/clifolder.sh index 13abcf2..2a4f417 100755 --- a/scripts/clifolder.sh +++ b/scripts/clifolder.sh @@ -33,9 +33,9 @@ function sendMessage() { ], "replyMethod": "REQToFileAppend", "retryWait": 5, - "ACKTimeout": 10, + "ACKTimeout": 30, "retries": 1, - "replyACKTimeout": 10, + "replyACKTimeout": 30, "replyRetries": 1, "methodTimeout": 10, "replyMethodTimeout": 10,