1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

more relay structure, and sub startup

This commit is contained in:
postmannen 2021-11-10 11:21:38 +01:00
parent c37eeffeb5
commit d02d9018e9
5 changed files with 52 additions and 11 deletions

View file

@ -84,6 +84,8 @@ type Configuration struct {
StartSubREQTailFile bool
// Subscriber for continously delivery of output from cli commands.
StartSubREQCliCommandCont bool
// Subscriber for relay messages.
StartSubREQRelay bool
}
// ConfigurationFromFile should have the same structure as
@ -122,6 +124,7 @@ type ConfigurationFromFile struct {
StartSubREQHttpGet *bool
StartSubREQTailFile *bool
StartSubREQCliCommandCont *bool
StartSubREQRelay *bool
}
// NewConfiguration will return a *Configuration.
@ -164,6 +167,7 @@ func newConfigurationDefaults() Configuration {
StartSubREQHttpGet: true,
StartSubREQTailFile: true,
StartSubREQCliCommandCont: true,
StartSubREQRelay: false,
}
return c
}
@ -329,6 +333,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.StartSubREQCliCommandCont = *cf.StartSubREQCliCommandCont
}
if cf.StartSubREQRelay == nil {
conf.StartSubREQRelay = cd.StartSubREQRelay
} else {
conf.StartSubREQRelay = *cf.StartSubREQRelay
}
return conf
}
@ -394,6 +403,7 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.StartSubREQHttpGet, "startSubREQHttpGet", fc.StartSubREQHttpGet, "true/false")
flag.BoolVar(&c.StartSubREQTailFile, "startSubREQTailFile", fc.StartSubREQTailFile, "true/false")
flag.BoolVar(&c.StartSubREQCliCommandCont, "startSubREQCliCommandCont", fc.StartSubREQCliCommandCont, "true/false")
flag.BoolVar(&c.StartSubREQRelay, "startSubREQRelay", fc.StartSubREQRelay, "true/false")
flag.Parse()

View file

@ -78,12 +78,14 @@ type Message struct {
PreviousMessage *Message
// The node to relay the message via.
RelayViaNode Node
RelayViaNode Node `json:"relayViaNode" yaml:"relayViaNode"`
// The node where the relayed message originated, and where we want
// to send back the end result.
RelayFromNode Node
RelayFromNode Node `json:"relayFromNode" yaml:"relayFromNode"`
// The original value of the ToNode field of the original message.
RelayToNode Node
RelayToNode Node `json:"relayToNode" yaml:"relayToNode"`
// The original method of the message.
RelayOriginalMethod Method `json:"relayOriginalMethod" yaml:"relayOriginalMethod"`
// done is used to signal when a message is fully processed.
// This is used for signaling back to the ringbuffer that we are

View file

@ -234,6 +234,10 @@ func (p *processes) Start(proc process) {
proc.startup.subREQCliCommandCont(proc)
}
if proc.configuration.StartSubREQRelay {
proc.startup.subREQRelay(proc)
}
proc.startup.subREQToSocket(proc)
}
@ -420,6 +424,14 @@ func (s startup) subREQCliCommandCont(p process) {
go proc.spawnWorker(p.processes, p.natsConn)
}
func (s startup) subREQRelay(p process) {
log.Printf("Starting Relay: %#v\n", p.node)
sub := newSubject(REQRelay, string(p.node))
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
go proc.spawnWorker(p.processes, p.natsConn)
}
func (s startup) subREQToSocket(p process) {
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
sub := newSubject(REQToSocket, string(p.node))

View file

@ -1543,6 +1543,8 @@ func (m methodREQRelay) getKind() CommandOrEvent {
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
// relay the message here to the actual host here.
fmt.Printf("\n * Got relay message: %v\n\n ", message)
// Send back an ACK message.
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
return ackMsg, nil

View file

@ -411,12 +411,33 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
}
for {
// looping here so we are able to redo the sending
// of the last message if a process with specified subject
// Looping here so we are able to redo the sending
// of the last message if a process for the specified subject
// is not present. The process will then be created, and
// the code will loop back here.
m := sam.Message
// ---------- HERE ----------
// We've got'n the message from the ringbuffer.
// NB: Think we should swap the ToNode field here with the value
// in RelayNode ???
// ----
// Check if it is a relay message
if m.RelayViaNode != "" {
// Keep the original values.
m.RelayFromNode = m.FromNode
m.RelayToNode = m.ToNode
m.RelayOriginalMethod = m.Method
// Convert it to a relay message.
m.Method = REQRelay
// Change destination to the relayViaNode.
m.ToNode = m.RelayViaNode
}
// --------------------------
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
@ -437,12 +458,6 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
proc = existingProc
}
// HERE ?
// We've got'n the message from the ringbuffer, and now found the
// process to send it on.
// NB: Think we should swap the ToNode field here with the value
// in RelayNode ???
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m