1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

renabled ringbuffer done channel triggered by persiststore flag

This commit is contained in:
postmannen 2023-05-30 05:46:02 +02:00
parent 602138ffa6
commit e50d906be3
3 changed files with 27 additions and 17 deletions

View file

@ -1005,13 +1005,15 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
// sending of the message.
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
// select {
// case m.done <- struct{}{}:
// // Signaling back to the ringbuffer that we are done with the
// // current message, and it can remove it from the ringbuffer.
// case <-p.ctx.Done():
// return
// }
if p.configuration.RingBufferPersistStore {
select {
case m.done <- struct{}{}:
// Signaling back to the ringbuffer that we are done with the
// current message, and it can remove it from the ringbuffer.
case <-p.ctx.Done():
return
}
}
// Increment the counter for the next message to be sent.
p.messageID++

View file

@ -281,6 +281,12 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh
},
}
// Create a ticker that will kick in when a message have been in the
// system for it's maximum time. This will allow us to continue, and
// remove the message if it takes longer than it should to get delivered.
ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * time.Second)
defer ticker.Stop()
ringBufferOutCh <- sd
// Just to confirm here that the message was picked up, to know if the
// the read process have stalled or not.
@ -305,14 +311,16 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh
// amount of time a message should be allowed to be using for getting published so
// we don't get stuck go routines here.
//
// TODO: Figure out why what the reason for not receceving the done signals might be.
// select {
// case <-v.SAM.done:
// case <-ticker.C:
// log.Printf("----------------------------------------------\n")
// log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject)
// log.Printf("----------------------------------------------\n")
// }
if r.configuration.RingBufferPersistStore {
select {
case <-v.SAM.done:
fmt.Printf("---\n DONE with\n---\n")
case <-ticker.C:
log.Printf("----------------------------------------------\n")
log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject)
log.Printf("----------------------------------------------\n")
}
}
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))

View file

@ -33,9 +33,9 @@ function sendMessage() {
],
"replyMethod": "REQToFileAppend",
"retryWait": 5,
"ACKTimeout": 10,
"ACKTimeout": 30,
"retries": 1,
"replyACKTimeout": 10,
"replyACKTimeout": 30,
"replyRetries": 1,
"methodTimeout": 10,
"replyMethodTimeout": 10,