mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-20 22:52:13 +00:00
handling of indivdual messages are now done in it's own goroutine, changed logic for ACK msg retries
This commit is contained in:
parent
709bd219f7
commit
c12cf70620
2 changed files with 111 additions and 106 deletions
91
process.go
91
process.go
|
@ -270,9 +270,6 @@ var (
|
||||||
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
|
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
|
||||||
retryAttempts := 0
|
retryAttempts := 0
|
||||||
|
|
||||||
const publishTimer time.Duration = 5
|
|
||||||
const subscribeSyncTimer time.Duration = 5
|
|
||||||
|
|
||||||
// The for loop will run until the message is delivered successfully,
|
// The for loop will run until the message is delivered successfully,
|
||||||
// or that retries are reached.
|
// or that retries are reached.
|
||||||
for {
|
for {
|
||||||
|
@ -294,7 +291,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
if p.subject.Event == EventNACK {
|
if p.subject.Event == EventNACK {
|
||||||
err := natsConn.PublishMsg(msg)
|
err := natsConn.PublishMsg(msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats publish of hello failed: %v", err)
|
er := fmt.Errorf("error: nats publish for message with subject failed: %v", err)
|
||||||
log.Printf("%v\n", er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -316,29 +313,52 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Since we got here, and the code was not detected as NACK message earlier
|
||||||
|
// then the message is an ACK message, and we should handle that.
|
||||||
|
//
|
||||||
|
// The function below will return nil if the message should not be retried.
|
||||||
|
//
|
||||||
|
// All other errors happening will return ErrACKSubscribeRetry which will lead
|
||||||
|
// to a 'continue' for the for loop when checking the error directly after this
|
||||||
|
// function is called
|
||||||
err := func() error {
|
err := func() error {
|
||||||
|
defer func() { retryAttempts++ }()
|
||||||
|
|
||||||
|
if retryAttempts > message.Retries {
|
||||||
|
// max retries reached
|
||||||
|
er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs)
|
||||||
|
|
||||||
|
// We do not want to send errorLogs for REQErrorLog type since
|
||||||
|
// it will just cause an endless loop.
|
||||||
|
if message.Method != REQErrorLog {
|
||||||
|
p.errorKernel.infoSend(p, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
er := fmt.Errorf("send attempt:%v, max retries: %v, ack timeout: %v, message.ID: %v, method: %v, toNode: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID, message.Method, message.ToNode)
|
||||||
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
// The SubscribeSync used in the subscriber, will get messages that
|
// The SubscribeSync used in the subscriber, will get messages that
|
||||||
// are sent after it started subscribing.
|
// are sent after it started subscribing.
|
||||||
//
|
//
|
||||||
// Create a subscriber for the ACK reply message.
|
// Create a subscriber for the ACK reply message.
|
||||||
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
err := subReply.Unsubscribe()
|
err := subReply.Unsubscribe()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err)
|
log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
|
log.Printf("%v, waiting equal to RetryWait %ds before retrying\n", er, message.RetryWait)
|
||||||
|
|
||||||
//time.Sleep(time.Second * subscribeSyncTimer)
|
time.Sleep(time.Second * time.Duration(message.ACKTimeout))
|
||||||
// subReply.Unsubscribe()
|
|
||||||
|
|
||||||
retryAttempts++
|
|
||||||
return ErrACKSubscribeRetry
|
return ErrACKSubscribeRetry
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -347,8 +367,8 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: nats publish failed: %v", err)
|
er := fmt.Errorf("error: nats publish failed: %v", err)
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
log.Printf("%v, waiting %ds before retrying\n", er, publishTimer)
|
log.Printf("%v, waiting equal to RetryWait of %ds before retrying\n", er, message.RetryWait)
|
||||||
time.Sleep(time.Second * publishTimer)
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
|
|
||||||
return ErrACKSubscribeRetry
|
return ErrACKSubscribeRetry
|
||||||
}
|
}
|
||||||
|
@ -361,8 +381,8 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
// we don't use it.
|
// we don't use it.
|
||||||
_, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
_, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if message.RetryWait < 0 {
|
if message.RetryWait <= 0 {
|
||||||
message.RetryWait = 0
|
message.RetryWait = message.ACKTimeout
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
@ -387,37 +407,20 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// did not receive a reply, decide if we should try to retry sending.
|
// did not receive a reply, retry sending.
|
||||||
retryAttempts++
|
|
||||||
er := fmt.Errorf("retry attempt:%v, retries: %v, ack timeout: %v, message.ID: %v", retryAttempts, message.Retries, message.ACKTimeout, message.ID)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
switch {
|
// Since the checking and cancelation if max retries are done at the beginning,
|
||||||
//case message.Retries == 0:
|
// we just check here if max retries are reached to decide if we should print
|
||||||
// // 0 indicates unlimited retries
|
// information to the log that another retry will be tried.
|
||||||
// continue
|
// if retryAttempts <= message.Retries {
|
||||||
case retryAttempts >= message.Retries:
|
// er = fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
|
||||||
// max retries reached
|
// p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
er := fmt.Errorf("info: toNode: %v, fromNode: %v, subject: %v, methodArgs: %v: max retries reached, check if node is up and running and if it got a subscriber started for the given REQ type", message.ToNode, message.FromNode, msg.Subject, message.MethodArgs)
|
// }
|
||||||
|
|
||||||
// We do not want to send errorLogs for REQErrorLog type since
|
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||||
// it will just cause an endless loop.
|
|
||||||
if message.Method != REQErrorLog {
|
|
||||||
p.errorKernel.infoSend(p, message, er)
|
|
||||||
}
|
|
||||||
|
|
||||||
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
return ErrACKSubscribeRetry
|
||||||
return er
|
|
||||||
|
|
||||||
default:
|
|
||||||
// none of the above matched, so we've not reached max retries yet
|
|
||||||
er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID)
|
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
|
||||||
|
|
||||||
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
|
||||||
|
|
||||||
return ErrACKSubscribeRetry
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -886,7 +889,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
||||||
b, err := cbor.Marshal(m)
|
b, err := cbor.Marshal(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||||
p.errorKernel.errSend(p, m, er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -899,7 +902,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
||||||
err := gobEnc.Encode(m)
|
err := gobEnc.Encode(m)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
|
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
|
||||||
p.errorKernel.errSend(p, m, er)
|
log.Printf("%v\n", er)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -958,7 +961,7 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
||||||
|
|
||||||
// We only wan't to send the error message to errorCentral once.
|
// We only wan't to send the error message to errorCentral once.
|
||||||
once.Do(func() {
|
once.Do(func() {
|
||||||
p.errorKernel.errSend(p, m, er)
|
log.Printf("%v\n", er)
|
||||||
})
|
})
|
||||||
|
|
||||||
// No compression, so we just assign the value of the serialized
|
// No compression, so we just assign the value of the serialized
|
||||||
|
|
126
server.go
126
server.go
|
@ -443,70 +443,72 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for samDBVal := range ringBufferOutCh {
|
for samDBVal := range ringBufferOutCh {
|
||||||
// Signal back to the ringbuffer that message have been picked up.
|
go func(samDBVal samDBValueAndDelivered) {
|
||||||
samDBVal.delivered()
|
// Signal back to the ringbuffer that message have been picked up.
|
||||||
|
samDBVal.delivered()
|
||||||
|
|
||||||
sam := samDBVal.samDBValue.Data
|
sam := samDBVal.samDBValue.Data
|
||||||
// Check if the format of the message is correct.
|
// Check if the format of the message is correct.
|
||||||
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok {
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)
|
||||||
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
||||||
continue
|
return
|
||||||
}
|
|
||||||
if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) {
|
|
||||||
er := fmt.Errorf("error: routeMessagesToProcess: the event type do not exist, message dropped: %v", sam.Message.Method)
|
|
||||||
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Looping here so we are able to redo the sending
|
|
||||||
// of the last message if a process for the specified subject
|
|
||||||
// is not present. The process will then be created, and
|
|
||||||
// the code will loop back here.
|
|
||||||
|
|
||||||
m := sam.Message
|
|
||||||
|
|
||||||
subjName := sam.Subject.name()
|
|
||||||
pn := processNameGet(subjName, processKindPublisher)
|
|
||||||
|
|
||||||
// Check if there is a map of type map[int]process registered
|
|
||||||
// for the processName, and if it exists then return it.
|
|
||||||
s.processes.active.mu.Lock()
|
|
||||||
proc, ok := s.processes.active.procNames[pn]
|
|
||||||
s.processes.active.mu.Unlock()
|
|
||||||
|
|
||||||
// If found a map above, range it, and are there already a process
|
|
||||||
// for that subject, put the message on that processes incomming
|
|
||||||
// message channel.
|
|
||||||
if ok {
|
|
||||||
// We have found the process to route the message to, deliver it.
|
|
||||||
proc.subject.messageCh <- m
|
|
||||||
|
|
||||||
break
|
|
||||||
} else {
|
|
||||||
// If a publisher process do not exist for the given subject, create it.
|
|
||||||
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
|
||||||
|
|
||||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
|
||||||
var proc process
|
|
||||||
switch {
|
|
||||||
case m.IsSubPublishedMsg:
|
|
||||||
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
|
|
||||||
default:
|
|
||||||
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
proc.spawnWorker()
|
|
||||||
er := fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
|
||||||
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
|
||||||
|
|
||||||
// Now when the process is spawned we continue,
|
|
||||||
// and send the message to that new process.
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
if !eventAvailable.CheckIfExists(sam.Subject.Event, sam.Subject) {
|
||||||
|
er := fmt.Errorf("error: routeMessagesToProcess: the event type do not exist, message dropped: %v", sam.Message.Method)
|
||||||
|
s.errorKernel.errSend(s.processInitial, sam.Message, er)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for {
|
||||||
|
// Looping here so we are able to redo the sending
|
||||||
|
// of the last message if a process for the specified subject
|
||||||
|
// is not present. The process will then be created, and
|
||||||
|
// the code will loop back here.
|
||||||
|
|
||||||
|
m := sam.Message
|
||||||
|
|
||||||
|
subjName := sam.Subject.name()
|
||||||
|
pn := processNameGet(subjName, processKindPublisher)
|
||||||
|
|
||||||
|
// Check if there is a map of type map[int]process registered
|
||||||
|
// for the processName, and if it exists then return it.
|
||||||
|
s.processes.active.mu.Lock()
|
||||||
|
proc, ok := s.processes.active.procNames[pn]
|
||||||
|
s.processes.active.mu.Unlock()
|
||||||
|
|
||||||
|
// If found a map above, range it, and are there already a process
|
||||||
|
// for that subject, put the message on that processes incomming
|
||||||
|
// message channel.
|
||||||
|
if ok {
|
||||||
|
// We have found the process to route the message to, deliver it.
|
||||||
|
proc.subject.messageCh <- m
|
||||||
|
|
||||||
|
break
|
||||||
|
} else {
|
||||||
|
// If a publisher process do not exist for the given subject, create it.
|
||||||
|
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||||
|
|
||||||
|
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||||
|
var proc process
|
||||||
|
switch {
|
||||||
|
case m.IsSubPublishedMsg:
|
||||||
|
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
|
||||||
|
default:
|
||||||
|
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
proc.spawnWorker()
|
||||||
|
er := fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
||||||
|
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
|
||||||
|
|
||||||
|
// Now when the process is spawned we continue,
|
||||||
|
// and send the message to that new process.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(samDBVal)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue