1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-05 20:09:16 +00:00
This commit is contained in:
postmannen 2021-12-30 06:28:21 +01:00
parent 8e87caf30a
commit b8d21420ed

View file

@ -194,8 +194,8 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
// messageDeliverNats will create the Nats message with headers and payload. // messageDeliverNats will create the Nats message with headers and payload.
// It will also take care of the delivering the message that is converted to // It will also take care of the delivering the message that is converted to
// gob format as a nats.Message. It will also take care of checking timeouts // gob or cbor format as a nats.Message. It will also take care of checking
// and retries specified for the message. // timeouts and retries specified for the message.
func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) {
retryAttempts := 0 retryAttempts := 0
@ -339,7 +339,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
message := Message{} message := Message{}
// Check if serialization is specified. // Check if serialization is specified.
// Will default to gob serialization if nothing is specified. // Will default to gob serialization if nothing or non existing value is specified is specified.
if val, ok := msg.Header["serial"]; ok { if val, ok := msg.Header["serial"]; ok {
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
switch val[0] { switch val[0] {
@ -420,9 +420,24 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
} }
// Check if it is an ACK or NACK message, and do the appropriate action accordingly. // Check if it is an ACK or NACK message, and do the appropriate action accordingly.
//
// With ACK messages Steward will keep the state of the message delivery, and try to
// resend the message if an ACK is not received within the timeout/retries specified
// in the message.
// When a process sends an ACK message, it will stop and wait for the nats-reply message
// for the time specified in the replyTimeout value. If no reply message is received
// within the given timeout the publishing process will try to resend the message for
// number of times specified in the retries field of the Steward message.
// When receiving a Steward-message with ACK enabled we send a message back the the
// node where the message originated using the msg.Reply subject field of the nats-message.
//
// With NACK messages we do not send a nats reply message, so the message will only be
// sent from the publisher once, and if it is not delivered it will not be retried.
switch { switch {
// Check for ACK type Commands or Event. // Check for ACK type Commands or Event.
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
// Look up the method handler for the specified method.
mh, ok := p.methodsAvailable.CheckIfExists(message.Method) mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok { if !ok {
er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.CommandOrEvent) er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.CommandOrEvent)
@ -432,6 +447,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
var out []byte var out []byte
var err error var err error
// Call the method handler for the specified method.
out, err = mh.handler(p, message, thisNode) out, err = mh.handler(p, message, thisNode)
if err != nil { if err != nil {