1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-15 17:51:15 +00:00

added timeout when no responders available

This commit is contained in:
postmannen 2022-03-09 07:11:48 +01:00
parent 8585086cda
commit d6f7bd1048
2 changed files with 10 additions and 1 deletions

View file

@ -274,6 +274,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
log.Printf("%v, waiting %ds before retrying\n", er, publishTimer) log.Printf("%v, waiting %ds before retrying\n", er, publishTimer)
time.Sleep(time.Second * publishTimer) time.Sleep(time.Second * publishTimer)
subReply.Unsubscribe()
continue continue
} }
@ -289,6 +290,11 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// sendErrorLogMessage(p.toRingbufferCh, p.node, er) // sendErrorLogMessage(p.toRingbufferCh, p.node, er)
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
if err == nats.ErrNoResponders {
fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout)
time.Sleep(time.Second * time.Duration(message.ACKTimeout))
}
// did not receive a reply, decide what to do.. // did not receive a reply, decide what to do..
retryAttempts++ retryAttempts++
er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) er = fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID)
@ -319,6 +325,9 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc() p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc()
subReply.Unsubscribe()
continue continue
} }
} }

View file

@ -328,7 +328,7 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
}) })
for _, v := range samDBValues { for _, v := range samDBValues {
log.Printf("info: k/v store: %#v\n", v) log.Printf("info: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", v.ID, v.Data.ID, v.Data.Subject, len(v.Data.Data))
} }
return nil return nil