1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 12:59:15 +00:00

restructured errors and retryWait in messageDeliverNats

This commit is contained in:
postmannen 2022-12-30 11:54:10 +01:00
parent 5c5f8100d0
commit 9e378295df

View file

@ -270,6 +270,10 @@ var (
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
retryAttempts := 0 retryAttempts := 0
if message.RetryWait <= 0 {
message.RetryWait = 0
}
// The for loop will run until the message is delivered successfully, // The for loop will run until the message is delivered successfully,
// or that retries are reached. // or that retries are reached.
for { for {
@ -359,7 +363,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait) log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait)
time.Sleep(time.Second * time.Duration(message.ACKTimeout)) time.Sleep(time.Second * time.Duration(message.RetryWait))
return ErrACKSubscribeRetry return ErrACKSubscribeRetry
} }
@ -383,9 +387,6 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// we don't use it. // we don't use it.
_, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) _, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
if err != nil { if err != nil {
if message.RetryWait <= 0 {
message.RetryWait = message.ACKTimeout
}
switch { switch {
case err == nats.ErrNoResponders || err == nats.ErrTimeout: case err == nats.ErrNoResponders || err == nats.ErrTimeout:
@ -393,8 +394,9 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
time.Sleep(time.Second * time.Duration(message.RetryWait)) time.Sleep(time.Second * time.Duration(message.RetryWait))
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
// Continue with the rest of the code to check number of retries etc.. return ErrACKSubscribeRetry
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
@ -409,20 +411,6 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
return er return er
} }
// did not receive a reply, retry sending.
// Since the checking and cancelation if max retries are done at the beginning,
// we just check here if max retries are reached to decide if we should print
// information to the log that another retry will be tried.
// if retryAttempts <= message.Retries {
// er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
// p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
// }
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
return ErrACKSubscribeRetry
} }
return nil return nil