From d6f7bd1048d0a96fdc1bca9c34c4865fd78b9feb Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 9 Mar 2022 07:11:48 +0100 Subject: [PATCH] added timeout when no responders available --- process.go | 9 +++++++++ ringbuffer.go | 2 +- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/process.go b/process.go index 8db3fe1..66fd477 100644 --- a/process.go +++ b/process.go @@ -274,6 +274,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) log.Printf("%v, waiting %ds before retrying\n", er, publishTimer) time.Sleep(time.Second * publishTimer) + subReply.Unsubscribe() continue } @@ -289,6 +290,11 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // sendErrorLogMessage(p.toRingbufferCh, p.node, er) p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + if err == nats.ErrNoResponders { + fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout) + time.Sleep(time.Second * time.Duration(message.ACKTimeout)) + } + // did not receive a reply, decide what to do.. retryAttempts++ er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) @@ -319,6 +325,9 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc() + + subReply.Unsubscribe() + continue } } diff --git a/ringbuffer.go b/ringbuffer.go index 9085c34..3bd2f8b 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -328,7 +328,7 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) { }) for _, v := range samDBValues { - log.Printf("info: k/v store: %#v\n", v) + log.Printf("info: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", v.ID, v.Data.ID, v.Data.Subject, len(v.Data.Data)) } return nil