diff --git a/process.go b/process.go index f7f5751..ed45321 100644 --- a/process.go +++ b/process.go @@ -270,6 +270,10 @@ var ( func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { retryAttempts := 0 + if message.RetryWait <= 0 { + message.RetryWait = 0 + } + // The for loop will run until the message is delivered successfully, // or that retries are reached. for { @@ -359,7 +363,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait) - time.Sleep(time.Second * time.Duration(message.ACKTimeout)) + time.Sleep(time.Second * time.Duration(message.RetryWait)) return ErrACKSubscribeRetry } @@ -383,9 +387,6 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // we don't use it. _, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) if err != nil { - if message.RetryWait <= 0 { - message.RetryWait = message.ACKTimeout - } switch { case err == nats.ErrNoResponders || err == nats.ErrTimeout: @@ -393,8 +394,9 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) time.Sleep(time.Second * time.Duration(message.RetryWait)) + p.metrics.promNatsMessagesMissedACKsTotal.Inc() - // Continue with the rest of the code to check number of retries etc.. + return ErrACKSubscribeRetry case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) @@ -409,20 +411,6 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He return er } - // did not receive a reply, retry sending. - - // Since the checking and cancelation if max retries are done at the beginning, - // we just check here if max retries are reached to decide if we should print - // information to the log that another retry will be tried. - // if retryAttempts <= message.Retries { - // er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) - // p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - // } - - p.metrics.promNatsMessagesMissedACKsTotal.Inc() - - return ErrACKSubscribeRetry - } return nil