mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 10:57:42 +00:00
made ACK/NACK selection more clear
This commit is contained in:
parent
c12cf70620
commit
5c5f8100d0
1 changed files with 128 additions and 123 deletions
251
process.go
251
process.go
|
@ -286,145 +286,148 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
er := fmt.Errorf("info: preparing to send nats message with subject %v ", msg.Subject)
|
er := fmt.Errorf("info: preparing to send nats message with subject %v ", msg.Subject)
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
|
var err error
|
||||||
|
|
||||||
|
switch {
|
||||||
// If it is a NACK message we just deliver the message and return
|
// If it is a NACK message we just deliver the message and return
|
||||||
// here so we don't create a ACK message and then stop waiting for it.
|
// here so we don't create a ACK message and then stop waiting for it.
|
||||||
if p.subject.Event == EventNACK {
|
case p.subject.Event == EventNACK:
|
||||||
err := natsConn.PublishMsg(msg)
|
err = func() error {
|
||||||
if err != nil {
|
err := natsConn.PublishMsg(msg)
|
||||||
er := fmt.Errorf("error: nats publish for message with subject failed: %v", err)
|
|
||||||
log.Printf("%v\n", er)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.metrics.promNatsDeliveredTotal.Inc()
|
|
||||||
|
|
||||||
er := fmt.Errorf("info: sent nats message with subject %v ", msg.Subject)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
//err = natsConn.Flush()
|
|
||||||
//if err != nil {
|
|
||||||
// er := fmt.Errorf("error: nats publish flush failed: %v", err)
|
|
||||||
// log.Printf("%v\n", er)
|
|
||||||
// return
|
|
||||||
//}
|
|
||||||
|
|
||||||
// The remaining logic is for handling ACK messages, so we return here
|
|
||||||
// since it was a NACK message, and all or now done.
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since we got here, and the code was not detected as NACK message earlier
|
|
||||||
// then the message is an ACK message, and we should handle that.
|
|
||||||
//
|
|
||||||
// The function below will return nil if the message should not be retried.
|
|
||||||
//
|
|
||||||
// All other errors happening will return ErrACKSubscribeRetry which will lead
|
|
||||||
// to a 'continue' for the for loop when checking the error directly after this
|
|
||||||
// function is called
|
|
||||||
err := func() error {
|
|
||||||
defer func() { retryAttempts++ }()
|
|
||||||
|
|
||||||
if retryAttempts > message.Retries {
|
|
||||||
// max retries reached
|
|
||||||
er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs)
|
|
||||||
|
|
||||||
// We do not want to send errorLogs for REQErrorLog type since
|
|
||||||
// it will just cause an endless loop.
|
|
||||||
if message.Method != REQErrorLog {
|
|
||||||
p.errorKernel.infoSend(p, message, er)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
// The SubscribeSync used in the subscriber, will get messages that
|
|
||||||
// are sent after it started subscribing.
|
|
||||||
//
|
|
||||||
// Create a subscriber for the ACK reply message.
|
|
||||||
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
|
||||||
defer func() {
|
|
||||||
err := subReply.Unsubscribe()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err)
|
er := fmt.Errorf("error: nats publish for message with subject failed: %v", err)
|
||||||
|
log.Printf("%v\n", er)
|
||||||
|
return ErrACKSubscribeRetry
|
||||||
}
|
}
|
||||||
|
p.metrics.promNatsDeliveredTotal.Inc()
|
||||||
|
|
||||||
|
er := fmt.Errorf("info: sent nats message with subject %v ", msg.Subject)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
|
//err = natsConn.Flush()
|
||||||
|
//if err != nil {
|
||||||
|
// er := fmt.Errorf("error: nats publish flush failed: %v", err)
|
||||||
|
// log.Printf("%v\n", er)
|
||||||
|
// return
|
||||||
|
//}
|
||||||
|
|
||||||
|
// The remaining logic is for handling ACK messages, so we return here
|
||||||
|
// since it was a NACK message, and all or now done.
|
||||||
|
|
||||||
|
return nil
|
||||||
}()
|
}()
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
|
||||||
log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait)
|
|
||||||
|
|
||||||
time.Sleep(time.Second * time.Duration(message.ACKTimeout))
|
case p.subject.Event == EventACK:
|
||||||
|
// The function below will return nil if the message should not be retried.
|
||||||
return ErrACKSubscribeRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish message
|
|
||||||
err = natsConn.PublishMsg(msg)
|
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: nats publish failed: %v", err)
|
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
|
||||||
log.Printf("%v, waiting equal to RetryWait of %ds before retrying\n", er, message.RetryWait)
|
|
||||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
|
||||||
|
|
||||||
return ErrACKSubscribeRetry
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait up until ACKTimeout specified for a reply,
|
|
||||||
// continue and resend if no reply received,
|
|
||||||
// or exit if max retries for the message reached.
|
|
||||||
//
|
//
|
||||||
// The nats.Msg returned is discarded with '_' since
|
// All other errors happening will return ErrACKSubscribeRetry which will lead
|
||||||
// we don't use it.
|
// to a 'continue' for the for loop when checking the error directly after this
|
||||||
_, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
// function is called
|
||||||
if err != nil {
|
err = func() error {
|
||||||
if message.RetryWait <= 0 {
|
defer func() { retryAttempts++ }()
|
||||||
message.RetryWait = message.ACKTimeout
|
|
||||||
|
if retryAttempts > message.Retries {
|
||||||
|
// max retries reached
|
||||||
|
er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs)
|
||||||
|
|
||||||
|
// We do not want to send errorLogs for REQErrorLog type since
|
||||||
|
// it will just cause an endless loop.
|
||||||
|
if message.Method != REQErrorLog {
|
||||||
|
p.errorKernel.infoSend(p, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode)
|
||||||
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
|
// The SubscribeSync used in the subscriber, will get messages that
|
||||||
|
// are sent after it started subscribing.
|
||||||
|
//
|
||||||
|
// Create a subscriber for the ACK reply message.
|
||||||
|
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
||||||
|
defer func() {
|
||||||
|
err := subReply.Unsubscribe()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||||
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
|
log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait)
|
||||||
|
|
||||||
|
time.Sleep(time.Second * time.Duration(message.ACKTimeout))
|
||||||
|
|
||||||
|
return ErrACKSubscribeRetry
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish message
|
||||||
|
err = natsConn.PublishMsg(msg)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: nats publish failed: %v", err)
|
||||||
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
|
log.Printf("%v, waiting equal to RetryWait of %ds before retrying\n", er, message.RetryWait)
|
||||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
|
|
||||||
// Continue with the rest of the code to check number of retries etc..
|
return ErrACKSubscribeRetry
|
||||||
|
|
||||||
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
|
||||||
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
return er
|
|
||||||
|
|
||||||
default:
|
|
||||||
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
return er
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// did not receive a reply, retry sending.
|
// Wait up until ACKTimeout specified for a reply,
|
||||||
|
// continue and resend if no reply received,
|
||||||
|
// or exit if max retries for the message reached.
|
||||||
|
//
|
||||||
|
// The nats.Msg returned is discarded with '_' since
|
||||||
|
// we don't use it.
|
||||||
|
_, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
||||||
|
if err != nil {
|
||||||
|
if message.RetryWait <= 0 {
|
||||||
|
message.RetryWait = message.ACKTimeout
|
||||||
|
}
|
||||||
|
|
||||||
// Since the checking and cancelation if max retries are done at the beginning,
|
switch {
|
||||||
// we just check here if max retries are reached to decide if we should print
|
case err == nats.ErrNoResponders || err == nats.ErrTimeout:
|
||||||
// information to the log that another retry will be tried.
|
er := fmt.Errorf("error: ack receive failed: waiting for %v seconds before retrying: subject=%v: %v", message.RetryWait, p.subject.name(), err)
|
||||||
// if retryAttempts <= message.Retries {
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
// er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
|
|
||||||
// p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
// }
|
|
||||||
|
|
||||||
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
|
|
||||||
return ErrACKSubscribeRetry
|
// Continue with the rest of the code to check number of retries etc..
|
||||||
|
|
||||||
}
|
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
||||||
|
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
return nil
|
return er
|
||||||
}()
|
|
||||||
|
default:
|
||||||
|
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
|
return er
|
||||||
|
}
|
||||||
|
|
||||||
|
// did not receive a reply, retry sending.
|
||||||
|
|
||||||
|
// Since the checking and cancelation if max retries are done at the beginning,
|
||||||
|
// we just check here if max retries are reached to decide if we should print
|
||||||
|
// information to the log that another retry will be tried.
|
||||||
|
// if retryAttempts <= message.Retries {
|
||||||
|
// er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
|
||||||
|
// p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
// }
|
||||||
|
|
||||||
|
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||||
|
|
||||||
|
return ErrACKSubscribeRetry
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
if err == ErrACKSubscribeRetry {
|
if err == ErrACKSubscribeRetry {
|
||||||
continue
|
continue
|
||||||
|
@ -664,9 +667,11 @@ func executeHandler(p process, message Message, thisNode string) {
|
||||||
runAsScheduled = true
|
runAsScheduled = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler.
|
if p.configuration.EnableAclCheck {
|
||||||
er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true)
|
// Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler.
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
case !runAsScheduled:
|
case !runAsScheduled:
|
||||||
|
|
Loading…
Add table
Reference in a new issue