diff --git a/.gitignore b/.gitignore index 607362d..e87e81e 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ ships/ .gitignore testing_messages/ test.file +doc/concept/via/README.md diff --git a/message_and_subject.go b/message_and_subject.go index 9fc5330..a220e15 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -77,11 +77,13 @@ type Message struct { // initial request message. PreviousMessage *Message - // The node to send the message via. - viaTo Node - // The node where the via message originated, and where we want - // to send the end result. - viaFrom Node + // The node to relay the message via. + RelayViaNode Node + // The node where the relayed message originated, and where we want + // to send back the end result. + RelayFromNode Node + // The original value of the ToNode field of the original message. + RelayToNode Node // done is used to signal when a message is fully processed. // This is used for signaling back to the ringbuffer that we are diff --git a/requests.go b/requests.go index 3a9a3f9..888d24f 100644 --- a/requests.go +++ b/requests.go @@ -120,6 +120,8 @@ const ( REQTailFile Method = "REQTailFile" // Write to steward socket REQToSocket Method = "REQToSocket" + // Send a message via a node + REQRelay Method = "REQRelay" ) // The mapping of all the method constants specified, what type @@ -187,6 +189,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQToSocket: methodREQToSocket{ commandOrEvent: EventACK, }, + REQRelay: methodREQRelay{ + commandOrEvent: EventACK, + }, }, } @@ -1525,3 +1530,22 @@ func (m methodREQToSocket) handler(proc process, message Message, node string) ( } // ---- + +type methodREQRelay struct { + commandOrEvent CommandOrEvent +} + +func (m methodREQRelay) getKind() CommandOrEvent { + return m.commandOrEvent +} + +// Handler to relay messages via a host. +func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) { + // relay the message here to the actual host here. + + // Send back an ACK message. + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// ---- diff --git a/server.go b/server.go index 208241b..2b1e672 100644 --- a/server.go +++ b/server.go @@ -437,16 +437,18 @@ func (s *server) routeMessagesToProcess(dbFileName string) { proc = existingProc } + // HERE ? + // We've got'n the message from the ringbuffer, and now found the + // process to send it on. + // NB: Think we should swap the ToNode field here with the value + // in RelayNode ??? + // We have found the process to route the message to, deliver it. proc.subject.messageCh <- m - // If no process to handle the specific subject exist, - // the we create and spawn one. - break } else { - // If a publisher process do not exist for the given subject, create it, and - // by using the goto at the end redo the process for this specific message. + // If a publisher process do not exist for the given subject, create it. log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) @@ -455,7 +457,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { proc.spawnWorker(s.processes, s.natsConn) log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID) - // Now when the process is spawned we jump back to the redo: label, + // Now when the process is spawned we continue, // and send the message to that new process. continue }