diff --git a/process.go b/process.go index 407c8c2..36c4c8f 100644 --- a/process.go +++ b/process.go @@ -194,8 +194,8 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { // messageDeliverNats will create the Nats message with headers and payload. // It will also take care of the delivering the message that is converted to -// gob format as a nats.Message. It will also take care of checking timeouts -// and retries specified for the message. +// gob or cbor format as a nats.Message. It will also take care of checking +// timeouts and retries specified for the message. func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.Header, natsConn *nats.Conn, message Message) { retryAttempts := 0 @@ -339,7 +339,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, message := Message{} // Check if serialization is specified. - // Will default to gob serialization if nothing is specified. + // Will default to gob serialization if nothing or non existing value is specified is specified. if val, ok := msg.Header["serial"]; ok { // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) switch val[0] { @@ -420,9 +420,24 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, } // Check if it is an ACK or NACK message, and do the appropriate action accordingly. + // + // With ACK messages Steward will keep the state of the message delivery, and try to + // resend the message if an ACK is not received within the timeout/retries specified + // in the message. + // When a process sends an ACK message, it will stop and wait for the nats-reply message + // for the time specified in the replyTimeout value. If no reply message is received + // within the given timeout the publishing process will try to resend the message for + // number of times specified in the retries field of the Steward message. + // When receiving a Steward-message with ACK enabled we send a message back the the + // node where the message originated using the msg.Reply subject field of the nats-message. + // + // With NACK messages we do not send a nats reply message, so the message will only be + // sent from the publisher once, and if it is not delivered it will not be retried. switch { + // Check for ACK type Commands or Event. case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: + // Look up the method handler for the specified method. mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { er := fmt.Errorf("error: subscriberHandler: no such method type: %v", p.subject.CommandOrEvent) @@ -432,6 +447,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, var out []byte var err error + // Call the method handler for the specified method. out, err = mh.handler(p, message, thisNode) if err != nil {