diff --git a/process.go b/process.go index 0209ee0..634bfb2 100644 --- a/process.go +++ b/process.go @@ -270,9 +270,6 @@ var ( func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { retryAttempts := 0 - const publishTimer time.Duration = 5 - const subscribeSyncTimer time.Duration = 5 - // The for loop will run until the message is delivered successfully, // or that retries are reached. for { @@ -294,7 +291,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He if p.subject.Event == EventNACK { err := natsConn.PublishMsg(msg) if err != nil { - er := fmt.Errorf("error: nats publish of hello failed: %v", err) + er := fmt.Errorf("error: nats publish for message with subject failed: %v", err) log.Printf("%v\n", er) return } @@ -316,29 +313,52 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He return } + // Since we got here, and the code was not detected as NACK message earlier + // then the message is an ACK message, and we should handle that. + // + // The function below will return nil if the message should not be retried. + // + // All other errors happening will return ErrACKSubscribeRetry which will lead + // to a 'continue' for the for loop when checking the error directly after this + // function is called err := func() error { + defer func() { retryAttempts++ }() + + if retryAttempts > message.Retries { + // max retries reached + er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs) + + // We do not want to send errorLogs for REQErrorLog type since + // it will just cause an endless loop. + if message.Method != REQErrorLog { + p.errorKernel.infoSend(p, message, er) + } + + p.metrics.promNatsMessagesFailedACKsTotal.Inc() + return nil + } + + er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + // The SubscribeSync used in the subscriber, will get messages that // are sent after it started subscribing. // // Create a subscriber for the ACK reply message. subReply, err := natsConn.SubscribeSync(msg.Reply) - defer func() { err := subReply.Unsubscribe() if err != nil { log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err) } }() - if err != nil { er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer) + log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait) - //time.Sleep(time.Second * subscribeSyncTimer) - // subReply.Unsubscribe() + time.Sleep(time.Second * time.Duration(message.ACKTimeout)) - retryAttempts++ return ErrACKSubscribeRetry } @@ -347,8 +367,8 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He if err != nil { er := fmt.Errorf("error: nats publish failed: %v", err) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting %ds before retrying\n", er, publishTimer) - time.Sleep(time.Second * publishTimer) + log.Printf("%v, waiting equal to RetryWait of %ds before retrying\n", er, message.RetryWait) + time.Sleep(time.Second * time.Duration(message.RetryWait)) return ErrACKSubscribeRetry } @@ -361,8 +381,8 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // we don't use it. _, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout)) if err != nil { - if message.RetryWait < 0 { - message.RetryWait = 0 + if message.RetryWait <= 0 { + message.RetryWait = message.ACKTimeout } switch { @@ -387,37 +407,20 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He return er } - // did not receive a reply, decide if we should try to retry sending. - retryAttempts++ - er := fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + // did not receive a reply, retry sending. - switch { - //case message.Retries == 0: - // // 0 indicates unlimited retries - // continue - case retryAttempts >= message.Retries: - // max retries reached - er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs) + // Since the checking and cancelation if max retries are done at the beginning, + // we just check here if max retries are reached to decide if we should print + // information to the log that another retry will be tried. + // if retryAttempts <= message.Retries { + // er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) + // p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + // } - // We do not want to send errorLogs for REQErrorLog type since - // it will just cause an endless loop. - if message.Method != REQErrorLog { - p.errorKernel.infoSend(p, message, er) - } + p.metrics.promNatsMessagesMissedACKsTotal.Inc() - p.metrics.promNatsMessagesFailedACKsTotal.Inc() - return er + return ErrACKSubscribeRetry - default: - // none of the above matched, so we've not reached max retries yet - er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - p.metrics.promNatsMessagesMissedACKsTotal.Inc() - - return ErrACKSubscribeRetry - } } return nil @@ -886,7 +889,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, b, err := cbor.Marshal(m) if err != nil { er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.errorKernel.errSend(p, m, er) + log.Printf("%v\n", er) return } @@ -899,7 +902,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, err := gobEnc.Encode(m) if err != nil { er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.errorKernel.errSend(p, m, er) + log.Printf("%v\n", er) return } @@ -958,7 +961,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, // We only wan't to send the error message to errorCentral once. once.Do(func() { - p.errorKernel.errSend(p, m, er) + log.Printf("%v\n", er) }) // No compression, so we just assign the value of the serialized diff --git a/server.go b/server.go index 3cc682d..39781be 100644 --- a/server.go +++ b/server.go @@ -443,70 +443,72 @@ func (s *server) routeMessagesToProcess(dbFileName string) { go func() { for samDBVal := range ringBufferOutCh { - // Signal back to the ringbuffer that message have been picked up. - samDBVal.delivered() + go func(samDBVal samDBValueAndDelivered) { + // Signal back to the ringbuffer that message have been picked up. + samDBVal.delivered() - sam := samDBVal.samDBValue.Data - // Check if the format of the message is correct. - if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { - er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er) - continue - } - if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) { - er := fmt.Errorf("error: routeMessagesToProcess: the event type do not exist, message dropped: %v", sam.Message.Method) - s.errorKernel.errSend(s.processInitial, sam.Message, er) - - continue - } - - for { - // Looping here so we are able to redo the sending - // of the last message if a process for the specified subject - // is not present. The process will then be created, and - // the code will loop back here. - - m := sam.Message - - subjName := sam.Subject.name() - pn := processNameGet(subjName, processKindPublisher) - - // Check if there is a map of type map[int]process registered - // for the processName, and if it exists then return it. - s.processes.active.mu.Lock() - proc, ok := s.processes.active.procNames[pn] - s.processes.active.mu.Unlock() - - // If found a map above, range it, and are there already a process - // for that subject, put the message on that processes incomming - // message channel. - if ok { - // We have found the process to route the message to, deliver it. - proc.subject.messageCh <- m - - break - } else { - // If a publisher process do not exist for the given subject, create it. - // log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) - - sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - var proc process - switch { - case m.IsSubPublishedMsg: - proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil) - default: - proc = newProcess(s.ctx, s, sub, processKindPublisher, nil) - } - - proc.spawnWorker() - er := fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) - s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration) - - // Now when the process is spawned we continue, - // and send the message to that new process. - continue + sam := samDBVal.samDBValue.Data + // Check if the format of the message is correct. + if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { + er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method) + s.errorKernel.errSend(s.processInitial, sam.Message, er) + return } - } + if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) { + er := fmt.Errorf("error: routeMessagesToProcess: the event type do not exist, message dropped: %v", sam.Message.Method) + s.errorKernel.errSend(s.processInitial, sam.Message, er) + + return + } + + for { + // Looping here so we are able to redo the sending + // of the last message if a process for the specified subject + // is not present. The process will then be created, and + // the code will loop back here. + + m := sam.Message + + subjName := sam.Subject.name() + pn := processNameGet(subjName, processKindPublisher) + + // Check if there is a map of type map[int]process registered + // for the processName, and if it exists then return it. + s.processes.active.mu.Lock() + proc, ok := s.processes.active.procNames[pn] + s.processes.active.mu.Unlock() + + // If found a map above, range it, and are there already a process + // for that subject, put the message on that processes incomming + // message channel. + if ok { + // We have found the process to route the message to, deliver it. + proc.subject.messageCh <- m + + break + } else { + // If a publisher process do not exist for the given subject, create it. + // log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) + + sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) + var proc process + switch { + case m.IsSubPublishedMsg: + proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil) + default: + proc = newProcess(s.ctx, s, sub, processKindPublisher, nil) + } + + proc.spawnWorker() + er := fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID) + s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration) + + // Now when the process is spawned we continue, + // and send the message to that new process. + continue + } + } + }(samDBVal) } }() }