mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-05 06:46:48 +00:00
relay, central ok, but no output to node1
This commit is contained in:
parent
e7a0d8d8e5
commit
c8f8de71c5
5 changed files with 75 additions and 6 deletions
|
@ -1 +1 @@
|
|||
<mxfile host="Electron" modified="2021-11-18T09:12:16.544Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/15.4.0 Chrome/91.0.4472.164 Electron/13.5.0 Safari/537.36" etag="tMkCCN8-j1up8cz3Nidx" version="15.4.0" type="device"><diagram id="C5RBs43oDa-KdzZeNtuy" name="Page-1">7VpbU9s4FP41menuTBhfYpM8hkDZ7haWJWxpHxVbsVVkK8gySfrrV7Ik3+QQAoTQzr5krKP7+c759EnQcyfJ6pyCRXxBQoh7jhWueu5pz3GGnsV/hWEtDb7tSENEUShNdmWYoh9QGVW/KEchzBoNGSGYoUXTGJA0hQFr2AClZNlsNie4OesCRNAwTAOATestClmstuUcV/Y/IIpiPbPtj2RNAnRjtZMsBiFZ1kzuWc+dUEKY/EpWE4iF77Rfbj+tb/HnO//8z3+ye/DvyV83l1/6crCPu3Qpt0Bhyp49tD+dw1P/5IuHzu8vVzf+zegr7g/l0A8A58pfaq9srR0IQ+5PVSSUxSQiKcBnlfWEkjwNoZjG4qWqzWdCFtxoc+N3yNhaBQfIGeGmmCVY1co5xUQtzLZsWMcToBFkj+zSKdHiUQ5JAhld834UYsDQQ3NSoOItKtuVXa8I4stxLJUa9kB5b60j3moOkZGcBlD1qpDhH7VlVKYCrx2wswzsriEIueUCZpnICp4tlCQCTRLcQWYgW+EmQFjGiMHpAgSidsm5oAujB0gZXD0DJdP7apSR03DiQO1qWeWqrakkruWpb22Gq+HoXb16/CtkhIy8bRnxhMzxX5g5L4Ji9D8UtXaDQ0LhGFDw81qQQXGc85/p+KLn+Jhv4GRG+Vckvj5M89l3cag71jitMdNv75OKhm6Tz90OLnLfkovcjgRwM0j51o+uSc5gIv2Z3ZArSgJeMDFY5EzDY5FUBAxKo1k+n0P6KZ3E7xOJgddAopRgNSQcyzORON4XErZ52P6KXDR4Ihd5h+SigQEFKgLZB4kIzyL++2YiXJdxb9bxXOoYoY0vxvzSAg/IT0296Vojk5+cDn4a7i0rvEMkAVwh9lV0P/JU6Vut5nSlRi4Ka11I+X5rnUTxW72u6laUdL+MpwIbi2soN6QkhW9wW7E3RMGLryvOUZNW/Te+sHivcJxRecURSOhTTV1yqnPtb5nNP8HBNrAPfbD5BiYT7mImvMti8ZuVEk75WZqT6o6JIA6z57NlE5d9Od61WuJOe7lOnqMOz5e3z9dnT/MguySHJFS7RqcVub4uoe5bwdhPZdkNEkaFS986Gvq+Om93I15+WoB1rcFC0Gm2mZddHWCalR2vHlpb27sj69H27WeqVnv+IVfc6q2XT+bzDO7lONBQ1bmHJAkoboqEM7519iCCwjgDwANAGMywYJ+e+9Fk+pgkszw7HMu7zTctb2hyjdN1kdyfUHsfr1pvqdxgGr6ubttKPd6h9d1xS98N2vG0Qd89QSp6bmsoucu9SUW76+1P8sCcFEutIte/z4mu6GdF7I15A9tdrKpKzR2/85pJDIM7sYK5lDNUEAnKxO5EbScHWSXpHJmEtH3QhdazFsBCwhZ452nKNWtPvaDFfFJc6S4K73OY8ZknetCUFPtegKV4xkFsw0Kuigeftkornn6kUa0FitWgtB/wiVOIy9E4XtLFckiDJzhXsuJmRMkdnBDMvVSm2Bxh3DIBjKKUFzGci26CbFEA8FiZExSGeJNKbBLOWxC3Z3VnUJ25Rx3MPdoXc2uifoy5g5xfobSArvFegEGWoWCT3C5Ycivpd4rDRxk7BFnc0vPvQO9JRjGRryHrdQCrbS8kZ8dqirBSEuxKzbxni5hbA20g5l1VaXkLbS34yevatb3daN9UpdtPFV6s/hAvm1f/zeCe/Qc=</diagram></mxfile>
|
||||
<mxfile host="Electron" modified="2021-11-18T14:33:46.755Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/15.4.0 Chrome/91.0.4472.164 Electron/13.5.0 Safari/537.36" etag="oED7C0X_evHRoHAWu5gt" version="15.4.0" type="device"><diagram id="C5RBs43oDa-KdzZeNtuy" name="Page-1">7Vxbd9o4EP41nNPuOXB8wQYeCUm63W3SNMn28qjYAtTIFrHlAPvrV7IkW74QIMEhYelDao/u+ma+mVHktOxRsPgUgdn0gvgQtyzDX7Ts05ZlmZbZZf9xyVJI+j0pmETIl5VywQ36F0qhIaUJ8mFcqEgJwRTNikKPhCH0aEEGoojMi9XGBBdHnYEJrAhuPICr0h/Ip1O5CquXy/+EaDJVI5vuQJQEQFWWK4mnwCdzTWSftexRRAgVT8FiBDHfPLUvPz4vf+Av9+6nv77FD+Cfk79vL7+3RWfn2zTJlhDBkD67a/dmDE/dk+8O+vRwubh1bwc/cbsvun4EOJH7JddKl2oDoc/2U76SiE7JhIQAn+XSk4gkoQ/5MAZ7y+t8IWTGhCYT/oaULqVygIQSJprSAMtSMSYfqITZmgUrfQLRBNInVmllaDE1hySANFqydhHEgKLH4qBA6tskq5c1vSKITccylG30ZJOl0nij2EVMksiDslWODHvQppGLUry2wM6oYHcNgc8kFzCOuVUwa4lIwNEk3j2kFWRz3DgI8ymi8GYGPF46Z2RQh9EjjChcPAOl6u6rXkynsItduax5bqym4pKpZqiusRqvwk5vu629QzAJoXrrTGID03FfaDovgmJwhEKr190nFFYFCuawORuk/pz9uBletCwXswWc3EXsacKfPtwkd7+5V7eMYahR08c3ykWGW2R0u4aM7NckI7vGAuwYRmztnWuSUBiIDY1vyVVEPPZSBWGWUIWPQUKuMSic3CXjMYw+h6Pp24TC7ReQyIIwDQnLcKpI9JpCwqy620Mko+6GZOTsk4y6FShQqsguCLh6pvrfrhrCdab31TJmSzU9lPHFmKUtcJ/BUjHktI1BlaCsGoLqN2YWzj6sAC4Q/cmbdxz59ksrOV3IntOXpXoJ2Xq1Rvz1l16WN0vfVLuY2QId8kyUCUISwldIWMwVavDSjMV1OsVo233lnMXZgT+LRJbDkVBuTeY5uWP7Ksz5HXi2rrlvz+ZWMBmxLaZ8d+mU/4yzIE7usxAHeZqJIPbj59NlEZemNt62StGd2mWdPAc1O5/ln7tnz6onuyT7JFRTo9OcXHdLqE2HMOamLLsihpHq0jY6fVcZ63bEy7wFWGoVZpxO49W8bNt2kZUtR1ettfXtgfFk/fJJVak+exAzLrVW0yfjcQwbcQe2wvR1lT1X3G7XLKhux2Ap4NPqm75dwQixxbNIUgh9EE9LBPYGFNw2WrUa/jpRupqm7lhIEID0IIAwd26cPfINqTh48AgQBneYu5aWfV5141MS3CXx/ly4UwyinH7VkVh1xwTNReFv49DyNcNyGPq7DcrXmp2zodU1Fbz3jFLw3i3r04rgfYM8wLFLXYlVNpYHmHVHu4IHxiSdaq657kNCVEE7TnVvyCqY9myRFyru+IOVjKbQu+czGItYNeJEgmK+Ol5ay0FGRjqdKiGt73SmkhUDYJ6fpHgnYcgSkpY8IJ2yQXEeVEfwIYExG3mkOg1Juu4ZmPNDOkRXTOQqPc4rh+DpwZ4QyrlAPhsUtj02cAhx1hvDS2yx6LLCE4wraZr2RuQejggm3Mf5cAwSAQ/CWEml4QGMJiF7xXDMa3C+RR7AQykOkO/jVVlAkXM0LqnxqA2kBD273qJ0Jh/UMPmgKSZXxP0Uk3sJy5fV1mg86GEQx8hblVulrLnWCdRmAk8y+NuMfQTDVJHXkHVqgFWyF5K1ZRcj7ixE2JaqrX6pI7vU0Qqi3jYFYQPVTnjjeW1b3yzUL6YgO0sv7L1EQVpE07O2i2kaTS0aOE/sdRyja2T/rALG/efHJ0/2O7A2C1a2tQGzdDQ3MNak1U/Xb0in634PJTw7Cx3C2sjJE/6aR03R5O6DwfOqNO7Qnj6myRZTMCONs8YgQHgp2mS/FmlZtt3lyg7xI+Q+vlJS7EQL1iwerBXKxCyHaeATBQAXi+eSk3l5V8wzLcTM9GDUZkv1eGxV154HMG0ZkwzTJmNaLETMwEPZuaFNLC2kEQjjMetSdR7CrMKcRH5xbL35HfDuJyl9tEs7zgKLbKet7iB/drR991E8w0DuOQox0gYeYwKoPqFy3LtWB3aCanlrPbaN6W/TKrPZ9lBfi07FEorivOLncRbo5uNyHlt+R+CS+FlYzo+s+CTX9ns0mPdmMKuU/4jmIaF5dGfvE89duLN2Sx105GWUCHrP67Ju06OVkiO4rVQ8+oCD07IjaxwWnk2xBr+ysCFvnNdUPTLHwWnakTkOC8+mmCOAdEr8Ot5I2eJrhCYoBPiiUu3IGQenY0fOOCw8d8UZN3AtZ5Qijeuzb9ecPo6EcdAKdiSMw8KzKcKoOdSoS02ys+0jaRywkh3Ptf8PaB5dwPvEcxcuIPuyJoTzlv5ljbiRKL9mSi8mlm74raD78tXobb5wWnXFr3Sfr6nLd+qrGnl7oqc+RdNuaNmmXb2iZTZ2jdquXr47Rykw+f1OHZwJeoRhDmPNhc21lzXFh6uFXuVYnXcNrtrK8pUd/WKl06uC29yX9NVPrU5JiorI2OT93KgERg7tSjCMNw+GW7Q006j75rDO1J6BBnvN/ySSuPSU/2Ep++w/</diagram></mxfile>
|
|
@ -76,6 +76,8 @@ type Message struct {
|
|||
|
||||
// The node to relay the message via.
|
||||
RelayViaNode Node `json:"relayViaNode" yaml:"relayViaNode"`
|
||||
// The original value of the RelayViaNode.
|
||||
RelayOriginalViaNode Node `json:"relayOriginalViaNode" yaml:"relayOriginalViaNode"`
|
||||
// The node where the relayed message originated, and where we want
|
||||
// to send back the end result.
|
||||
RelayFromNode Node `json:"relayFromNode" yaml:"relayFromNode"`
|
||||
|
|
10
processes.go
10
processes.go
|
@ -168,6 +168,8 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subREQRelay(proc)
|
||||
}
|
||||
|
||||
proc.startup.subREQRelayInitial(proc)
|
||||
|
||||
proc.startup.subREQToSocket(proc)
|
||||
}
|
||||
|
||||
|
@ -378,6 +380,14 @@ func (s startup) subREQRelay(p process) {
|
|||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQRelayInitial(p process) {
|
||||
log.Printf("Starting Relay Initial: %#v\n", p.node)
|
||||
sub := newSubject(REQRelayInitial, string(p.node))
|
||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
||||
|
||||
go proc.spawnWorker(p.processes, p.natsConn)
|
||||
}
|
||||
|
||||
func (s startup) subREQToSocket(p process) {
|
||||
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQToSocket, string(p.node))
|
||||
|
|
45
requests.go
45
requests.go
|
@ -121,6 +121,8 @@ const (
|
|||
REQToSocket Method = "REQToSocket"
|
||||
// Send a message via a node
|
||||
REQRelay Method = "REQRelay"
|
||||
// The method handler for the first step in a relay chain.
|
||||
REQRelayInitial Method = "REQRelayInitial"
|
||||
)
|
||||
|
||||
// The mapping of all the method constants specified, what type
|
||||
|
@ -194,6 +196,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
REQRelay: methodREQRelay{
|
||||
commandOrEvent: EventACK,
|
||||
},
|
||||
REQRelayInitial: methodREQRelayInitial{
|
||||
commandOrEvent: EventACK,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -1602,6 +1607,42 @@ func (m methodREQToSocket) handler(proc process, message Message, node string) (
|
|||
|
||||
// ----
|
||||
|
||||
type methodREQRelayInitial struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
||||
func (m methodREQRelayInitial) getKind() CommandOrEvent {
|
||||
return m.commandOrEvent
|
||||
}
|
||||
|
||||
// Handler to relay messages via a host.
|
||||
func (m methodREQRelayInitial) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
// relay the message to the actual host here.
|
||||
|
||||
fmt.Printf("********** DEBUG Method RelayInitial 1 ***********\n %#v\n************************\n", message)
|
||||
|
||||
message.ToNode = message.RelayOriginalViaNode
|
||||
message.FromNode = Node(node)
|
||||
message.Method = REQRelay
|
||||
|
||||
fmt.Printf("********** DEBUG Method RelayInitial2 ***********\n %#v\n************************\n", message)
|
||||
|
||||
sam, err := newSubjectAndMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er)
|
||||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
||||
// Send back an ACK message.
|
||||
ackMsg := []byte("confirmed REQRelay from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
type methodREQRelay struct {
|
||||
commandOrEvent CommandOrEvent
|
||||
}
|
||||
|
@ -1614,10 +1655,14 @@ func (m methodREQRelay) getKind() CommandOrEvent {
|
|||
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
// relay the message here to the actual host here.
|
||||
|
||||
fmt.Printf("********** DEBUG Method Relay 1 ***********\n %#v\n************************\n", message)
|
||||
|
||||
message.ToNode = message.RelayToNode
|
||||
message.FromNode = Node(node)
|
||||
message.Method = message.RelayOriginalMethod
|
||||
|
||||
fmt.Printf("********** DEBUG Method Relay 2 ***********\n %#v\n************************\n", message)
|
||||
|
||||
sam, err := newSubjectAndMessage(message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message)
|
||||
|
|
22
server.go
22
server.go
|
@ -425,17 +425,29 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
|||
|
||||
// Check if it is a relay message
|
||||
if m.RelayViaNode != "" && m.RelayViaNode != Node(s.nodeName) {
|
||||
|
||||
fmt.Printf("\n********** DEBUG routeMessagesToProcess ***********\n %#v\n************************\n\n", m)
|
||||
|
||||
// Keep the original values.
|
||||
m.RelayFromNode = m.FromNode
|
||||
m.RelayToNode = m.ToNode
|
||||
m.RelayOriginalViaNode = m.RelayViaNode
|
||||
m.RelayOriginalMethod = m.Method
|
||||
|
||||
// Convert it to a relay message.
|
||||
m.Method = REQRelay
|
||||
// Change destination to the relayViaNode.
|
||||
m.ToNode = m.RelayViaNode
|
||||
// Convert it to a relay initial message.
|
||||
m.Method = REQRelayInitial
|
||||
// Set the toNode of the message to this host, so we send
|
||||
// it to ourselves again and pick it up with the subscriber
|
||||
// for the REQReplyInitial handler method.
|
||||
m.ToNode = Node(s.nodeName)
|
||||
|
||||
sam.Subject = newSubject(REQRelay, string(m.RelayViaNode))
|
||||
// We are now done with the initial checking for if the new
|
||||
// message is a relay message, so we empty the viaNode field
|
||||
// so we don't end in an endless loop here.
|
||||
// The value is stored in RelayOriginalViaNode for later use.
|
||||
m.RelayViaNode = ""
|
||||
|
||||
sam.Subject = newSubject(REQRelayInitial, string(s.nodeName))
|
||||
}
|
||||
|
||||
// --------------------------
|
||||
|
|
Loading…
Add table
Reference in a new issue