1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

initial relay request types

This commit is contained in:
postmannen 2021-11-10 06:22:03 +01:00
parent 4fe066aebd
commit c37eeffeb5
4 changed files with 40 additions and 11 deletions

1
.gitignore vendored
View file

@ -13,3 +13,4 @@ ships/
.gitignore
testing_messages/
test.file
doc/concept/via/README.md

View file

@ -77,11 +77,13 @@ type Message struct {
// initial request message.
PreviousMessage *Message
// The node to send the message via.
viaTo Node
// The node where the via message originated, and where we want
// to send the end result.
viaFrom Node
// The node to relay the message via.
RelayViaNode Node
// The node where the relayed message originated, and where we want
// to send back the end result.
RelayFromNode Node
// The original value of the ToNode field of the original message.
RelayToNode Node
// done is used to signal when a message is fully processed.
// This is used for signaling back to the ringbuffer that we are

View file

@ -120,6 +120,8 @@ const (
REQTailFile Method = "REQTailFile"
// Write to steward socket
REQToSocket Method = "REQToSocket"
// Send a message via a node
REQRelay Method = "REQRelay"
)
// The mapping of all the method constants specified, what type
@ -187,6 +189,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
REQToSocket: methodREQToSocket{
commandOrEvent: EventACK,
},
REQRelay: methodREQRelay{
commandOrEvent: EventACK,
},
},
}
@ -1525,3 +1530,22 @@ func (m methodREQToSocket) handler(proc process, message Message, node string) (
}
// ----
type methodREQRelay struct {
commandOrEvent CommandOrEvent
}
func (m methodREQRelay) getKind() CommandOrEvent {
return m.commandOrEvent
}
// Handler to relay messages via a host.
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
// relay the message here to the actual host here.
// Send back an ACK message.
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil
}
// ----

View file

@ -437,16 +437,18 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
proc = existingProc
}
// HERE ?
// We've got'n the message from the ringbuffer, and now found the
// process to send it on.
// NB: Think we should swap the ToNode field here with the value
// in RelayNode ???
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m
// If no process to handle the specific subject exist,
// the we create and spawn one.
break
} else {
// If a publisher process do not exist for the given subject, create it, and
// by using the goto at the end redo the process for this specific message.
// If a publisher process do not exist for the given subject, create it.
log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
@ -455,7 +457,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
proc.spawnWorker(s.processes, s.natsConn)
log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
// Now when the process is spawned we jump back to the redo: label,
// Now when the process is spawned we continue,
// and send the message to that new process.
continue
}