mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-31 01:24:31 +00:00
comments
This commit is contained in:
parent
38e286ab7d
commit
9b07eaa44b
2 changed files with 5 additions and 22 deletions
13
process.go
13
process.go
|
@ -321,13 +321,12 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
}
|
||||
|
||||
// ----------- HERE -------------
|
||||
|
||||
// Send final reply for a relayed message back to the originating node.
|
||||
//
|
||||
// Check if the previous message was a relayed message, and if true
|
||||
// make a copy of the current message where the to field is set to
|
||||
// the value of the previous message's RelayFromNode field, so we
|
||||
// also can send the a copy of the reply back to where it originated.
|
||||
|
||||
if message.PreviousMessage != nil && message.PreviousMessage.RelayOriginalViaNode != "" {
|
||||
|
||||
// make a copy of the message
|
||||
|
@ -336,10 +335,10 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
|
||||
// We set the replyMethod of the initial message.
|
||||
// If no RelayReplyMethod was found, we default to the reply
|
||||
// method of the previos message.
|
||||
// method of the previous message.
|
||||
switch {
|
||||
case msgCopy.PreviousMessage.RelayReplyMethod == "":
|
||||
er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previos message: %v ", msgCopy)
|
||||
er := fmt.Errorf("error: subscriberHandler: no PreviousMessage.RelayReplyMethod found, defaulting to the reply method of previous message: %v ", msgCopy)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, p.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
|
||||
|
@ -347,7 +346,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod
|
||||
}
|
||||
|
||||
// Reset the previosMessage relay fields so the message don't loop.
|
||||
// Reset the previousMessage relay fields so the message don't loop.
|
||||
message.PreviousMessage.RelayViaNode = ""
|
||||
message.PreviousMessage.RelayOriginalViaNode = ""
|
||||
|
||||
|
@ -363,8 +362,6 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
p.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
}
|
||||
|
||||
// ------------------------------
|
||||
|
||||
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
|
||||
switch {
|
||||
// Check for ACK type Commands or Event.
|
||||
|
|
14
server.go
14
server.go
|
@ -417,11 +417,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
// the code will loop back here.
|
||||
|
||||
m := sam.Message
|
||||
// ---------- HERE ----------
|
||||
// We've got'n the message from the ringbuffer.
|
||||
// NB: Think we should swap the ToNode field here with the value
|
||||
// in RelayNode ???
|
||||
// ----
|
||||
|
||||
// Check if it is a relay message
|
||||
if m.RelayViaNode != "" && m.RelayViaNode != Node(s.nodeName) {
|
||||
|
@ -448,15 +443,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
sam.Subject = newSubject(REQRelayInitial, string(s.nodeName))
|
||||
}
|
||||
|
||||
// --------------------------
|
||||
|
||||
// TODO: Check out if we actually need the SAM structure anymore in
|
||||
// the flow before we come here to this point. Reason for thinking
|
||||
// about this is that we replace the SAM.subject in the relay above,
|
||||
// and maybe it makes more sense to move the creation here instead
|
||||
// so we could for example just store messages in the k/v database
|
||||
// to make things simpler.
|
||||
|
||||
subjName := sam.Subject.name()
|
||||
pn := processNameGet(subjName, processKindPublisher)
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue