diff --git a/process.go b/process.go index 3474f6a..aa407f7 100644 --- a/process.go +++ b/process.go @@ -297,24 +297,35 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // Wait up until ACKTimeout specified for a reply, // continue and resend if no reply received, // or exit if max retries for the message reached. + // + // The nats.Msg returned is discarded with '_' since + // we don't use it. _, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) if err != nil { + if message.RetryWait < 0 { + message.RetryWait = 0 + } - if err == nats.ErrNoResponders { - // fmt.Printf(" * DEBUG: Waiting, ACKTimeout: %v\n", message.ACKTimeout) - // time.Sleep(time.Second * time.Duration(message.ACKTimeout)) - if message.RetryWait < 0 { - message.RetryWait = 0 - } - + switch { + case err == nats.ErrNoResponders || err == nats.ErrTimeout: er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) - // sendErrorLogMessage(p.toRingbufferCh, p.node, er) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) time.Sleep(time.Second * time.Duration(message.RetryWait)) + case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: + er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + return + + default: + er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + return } - // did not receive a reply, decide what to do.. + // did not receive a reply, decide if we should try to retry sending. retryAttempts++ er := fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)