diff --git a/process.go b/process.go index aa407f7..4862ac6 100644 --- a/process.go +++ b/process.go @@ -6,6 +6,7 @@ import ( "context" "crypto/ed25519" "encoding/gob" + "errors" "fmt" "io" "log" @@ -17,6 +18,7 @@ import ( "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" + // "google.golang.org/protobuf/internal/errors" ) // processKind are either kindSubscriber or kindPublisher, and are @@ -230,6 +232,10 @@ func (p process) spawnWorker() { log.Printf("Successfully started process: %v\n", p.processName) } +var ( + ErrACKSubscribeRetry = errors.New("steward: retrying to subscribe for ack message") +) + // messageDeliverNats will create the Nats message with headers and payload. // It will also take care of the delivering the message that is converted to // gob or cbor format as a nats.Message. It will also take care of checking @@ -263,44 +269,55 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He return } p.metrics.promNatsDeliveredTotal.Inc() + + // The reaming logic is for handling ACK messages, so we return here + // since it was a NACK message, and all or now done. return } - // The SubscribeSync used in the subscriber, will get messages that - // are sent after it started subscribing. - // - // Create a subscriber for the ACK reply message. - subReply, err := natsConn.SubscribeSync(msg.Reply) - if err != nil { - er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) - // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer) - time.Sleep(time.Second * subscribeSyncTimer) - subReply.Unsubscribe() - continue - } + err := func() error { + // The SubscribeSync used in the subscriber, will get messages that + // are sent after it started subscribing. + // + // Create a subscriber for the ACK reply message. + subReply, err := natsConn.SubscribeSync(msg.Reply) + if err != nil { + er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) + // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer) - // Publish message - err = natsConn.PublishMsg(msg) - if err != nil { - er := fmt.Errorf("error: nats publish failed: %v", err) - // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting %ds before retrying\n", er, publishTimer) - time.Sleep(time.Second * publishTimer) - subReply.Unsubscribe() - continue - } + time.Sleep(time.Second * subscribeSyncTimer) + subReply.Unsubscribe() + + retryAttempts++ + return ErrACKSubscribeRetry + } + + defer func() { + err := subReply.Unsubscribe() + if err != nil { + log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err) + } + }() + + // Publish message + err = natsConn.PublishMsg(msg) + if err != nil { + er := fmt.Errorf("error: nats publish failed: %v", err) + // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + log.Printf("%v, waiting %ds before retrying\n", er, publishTimer) + time.Sleep(time.Second * publishTimer) + + return ErrACKSubscribeRetry + } - // If the message is an ACK type of message we must check that a - // reply, and if it is not we don't wait here at all. - if p.subject.Event == EventACK { // Wait up until ACKTimeout specified for a reply, // continue and resend if no reply received, // or exit if max retries for the message reached. // // The nats.Msg returned is discarded with '_' since // we don't use it. - _, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) + _, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) if err != nil { if message.RetryWait < 0 { message.RetryWait = 0 @@ -312,17 +329,20 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) time.Sleep(time.Second * time.Duration(message.RetryWait)) + + // Continue with the rest of the code to check number of retries etc.. + case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed: er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - return + return er default: er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - return + return er } // did not receive a reply, decide if we should try to retry sending. @@ -344,10 +364,8 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He p.errorKernel.infoSend(p, message, er) } - subReply.Unsubscribe() - p.metrics.promNatsMessagesFailedACKsTotal.Inc() - return + return er default: // none of the above matched, so we've not reached max retries yet @@ -356,15 +374,24 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He p.metrics.promNatsMessagesMissedACKsTotal.Inc() - subReply.Unsubscribe() - continue + return ErrACKSubscribeRetry } } - // REMOVED: log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data) + + return nil + }() + + if err == ErrACKSubscribeRetry { + continue + } + if err != nil { + // All error printing are handled within the function that returns + // the error, so we do nothing and return. + // No more trying to deliver the message + return } - subReply.Unsubscribe() - + // Message were delivered successfully. p.metrics.promNatsDeliveredTotal.Inc() return