From 5c5f8100d0c8b194590e8b6682b3e0d09abd29b7 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 29 Dec 2022 22:49:47 +0100 Subject: [PATCH] made ACK/NACK selection more clear --- process.go | 251 +++++++++++++++++++++++++++-------------------------- 1 file changed, 128 insertions(+), 123 deletions(-) diff --git a/process.go b/process.go index 634bfb2..f7f5751 100644 --- a/process.go +++ b/process.go @@ -286,145 +286,148 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He er := fmt.Errorf("info: preparing to send nats message with subject %v ", msg.Subject) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + var err error + + switch { // If it is a NACK message we just deliver the message and return // here so we don't create a ACK message and then stop waiting for it. - if p.subject.Event == EventNACK { - err := natsConn.PublishMsg(msg) - if err != nil { - er := fmt.Errorf("error: nats publish for message with subject failed: %v", err) - log.Printf("%v\n", er) - return - } - p.metrics.promNatsDeliveredTotal.Inc() - - er := fmt.Errorf("info: sent nats message with subject %v ", msg.Subject) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - //err = natsConn.Flush() - //if err != nil { - // er := fmt.Errorf("error: nats publish flush failed: %v", err) - // log.Printf("%v\n", er) - // return - //} - - // The remaining logic is for handling ACK messages, so we return here - // since it was a NACK message, and all or now done. - - return - } - - // Since we got here, and the code was not detected as NACK message earlier - // then the message is an ACK message, and we should handle that. - // - // The function below will return nil if the message should not be retried. - // - // All other errors happening will return ErrACKSubscribeRetry which will lead - // to a 'continue' for the for loop when checking the error directly after this - // function is called - err := func() error { - defer func() { retryAttempts++ }() - - if retryAttempts > message.Retries { - // max retries reached - er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs) - - // We do not want to send errorLogs for REQErrorLog type since - // it will just cause an endless loop. - if message.Method != REQErrorLog { - p.errorKernel.infoSend(p, message, er) - } - - p.metrics.promNatsMessagesFailedACKsTotal.Inc() - return nil - } - - er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - // The SubscribeSync used in the subscriber, will get messages that - // are sent after it started subscribing. - // - // Create a subscriber for the ACK reply message. - subReply, err := natsConn.SubscribeSync(msg.Reply) - defer func() { - err := subReply.Unsubscribe() + case p.subject.Event == EventNACK: + err = func() error { + err := natsConn.PublishMsg(msg) if err != nil { - log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err) + er := fmt.Errorf("error: nats publish for message with subject failed: %v", err) + log.Printf("%v\n", er) + return ErrACKSubscribeRetry } + p.metrics.promNatsDeliveredTotal.Inc() + + er := fmt.Errorf("info: sent nats message with subject %v ", msg.Subject) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + //err = natsConn.Flush() + //if err != nil { + // er := fmt.Errorf("error: nats publish flush failed: %v", err) + // log.Printf("%v\n", er) + // return + //} + + // The remaining logic is for handling ACK messages, so we return here + // since it was a NACK message, and all or now done. + + return nil }() - if err != nil { - er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) - // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait) - time.Sleep(time.Second * time.Duration(message.ACKTimeout)) - - return ErrACKSubscribeRetry - } - - // Publish message - err = natsConn.PublishMsg(msg) - if err != nil { - er := fmt.Errorf("error: nats publish failed: %v", err) - // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting equal to RetryWait of %ds before retrying\n", er, message.RetryWait) - time.Sleep(time.Second * time.Duration(message.RetryWait)) - - return ErrACKSubscribeRetry - } - - // Wait up until ACKTimeout specified for a reply, - // continue and resend if no reply received, - // or exit if max retries for the message reached. + case p.subject.Event == EventACK: + // The function below will return nil if the message should not be retried. // - // The nats.Msg returned is discarded with '_' since - // we don't use it. - _, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) - if err != nil { - if message.RetryWait <= 0 { - message.RetryWait = message.ACKTimeout + // All other errors happening will return ErrACKSubscribeRetry which will lead + // to a 'continue' for the for loop when checking the error directly after this + // function is called + err = func() error { + defer func() { retryAttempts++ }() + + if retryAttempts > message.Retries { + // max retries reached + er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs) + + // We do not want to send errorLogs for REQErrorLog type since + // it will just cause an endless loop. + if message.Method != REQErrorLog { + p.errorKernel.infoSend(p, message, er) + } + + p.metrics.promNatsMessagesFailedACKsTotal.Inc() + return nil } - switch { - case err == nats.ErrNoResponders || err == nats.ErrTimeout: - er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + // The SubscribeSync used in the subscriber, will get messages that + // are sent after it started subscribing. + // + // Create a subscriber for the ACK reply message. + subReply, err := natsConn.SubscribeSync(msg.Reply) + defer func() { + err := subReply.Unsubscribe() + if err != nil { + log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err) + } + }() + if err != nil { + er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) + // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait) + + time.Sleep(time.Second * time.Duration(message.ACKTimeout)) + + return ErrACKSubscribeRetry + } + + // Publish message + err = natsConn.PublishMsg(msg) + if err != nil { + er := fmt.Errorf("error: nats publish failed: %v", err) + // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + log.Printf("%v, waiting equal to RetryWait of %ds before retrying\n", er, message.RetryWait) time.Sleep(time.Second * time.Duration(message.RetryWait)) - // Continue with the rest of the code to check number of retries etc.. - - case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: - er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - return er - - default: - er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - return er + return ErrACKSubscribeRetry } - // did not receive a reply, retry sending. + // Wait up until ACKTimeout specified for a reply, + // continue and resend if no reply received, + // or exit if max retries for the message reached. + // + // The nats.Msg returned is discarded with '_' since + // we don't use it. + _, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) + if err != nil { + if message.RetryWait <= 0 { + message.RetryWait = message.ACKTimeout + } - // Since the checking and cancelation if max retries are done at the beginning, - // we just check here if max retries are reached to decide if we should print - // information to the log that another retry will be tried. - // if retryAttempts <= message.Retries { - // er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) - // p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - // } + switch { + case err == nats.ErrNoResponders || err == nats.ErrTimeout: + er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - p.metrics.promNatsMessagesMissedACKsTotal.Inc() + time.Sleep(time.Second * time.Duration(message.RetryWait)) - return ErrACKSubscribeRetry + // Continue with the rest of the code to check number of retries etc.. - } + case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: + er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - return nil - }() + return er + + default: + er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + return er + } + + // did not receive a reply, retry sending. + + // Since the checking and cancelation if max retries are done at the beginning, + // we just check here if max retries are reached to decide if we should print + // information to the log that another retry will be tried. + // if retryAttempts <= message.Retries { + // er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) + // p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + // } + + p.metrics.promNatsMessagesMissedACKsTotal.Inc() + + return ErrACKSubscribeRetry + + } + + return nil + }() + } if err == ErrACKSubscribeRetry { continue @@ -664,9 +667,11 @@ func executeHandler(p process, message Message, thisNode string) { runAsScheduled = true } - // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. - er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + if p.configuration.EnableAclCheck { + // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. + er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + } switch { case !runAsScheduled: