mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
relay working, but cmd cont failing output
This commit is contained in:
parent
0a0ee0f900
commit
22b3e4fbe5
4 changed files with 42 additions and 8 deletions
|
@ -86,6 +86,9 @@ type Message struct {
|
|||
RelayToNode Node `json:"relayToNode" yaml:"relayToNode"`
|
||||
// The original method of the message.
|
||||
RelayOriginalMethod Method `json:"relayOriginalMethod" yaml:"relayOriginalMethod"`
|
||||
// The method to use when the reply of the relayed message came
|
||||
// back to where originated from.
|
||||
RelayReplyMethod Method `json:"relayReplyMethod" yaml:"relayReplyMethod"`
|
||||
|
||||
// done is used to signal when a message is fully processed.
|
||||
// This is used for signaling back to the ringbuffer that we are
|
||||
|
|
25
process.go
25
process.go
|
@ -8,7 +8,6 @@ import (
|
|||
"log"
|
||||
"time"
|
||||
|
||||
"github.com/kr/pretty"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
@ -331,14 +330,34 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
// also can send the a copy of the reply back to where it originated.
|
||||
|
||||
if message.PreviousMessage != nil && message.PreviousMessage.RelayViaNode != "" {
|
||||
fmt.Printf("\n message.PreviousMessage.RelayViaNode: %v\n\n", message.PreviousMessage.RelayViaNode)
|
||||
// fmt.Printf("\n message.PreviousMessage.RelayViaNode: %v\n\n", message.PreviousMessage.RelayViaNode)
|
||||
// make a copy of the message
|
||||
msgCopy := message
|
||||
// fmt.Printf("\n *DEBUG0 msgCopy: %#v\n\n ", pretty.Formatter(msgCopy))
|
||||
msgCopy.ToNode = msgCopy.PreviousMessage.RelayFromNode
|
||||
|
||||
// If no RelayReplyMethod is set, we default to the replyMethod
|
||||
// of the initial message.
|
||||
|
||||
//fmt.Printf(" * replyMethod: %v\n", message.ReplyMethod)
|
||||
//fmt.Printf(" * previous replyMethod: %v\n", message.PreviousMessage.ReplyMethod)
|
||||
//
|
||||
//fmt.Printf(" * message.RelayReplyMethod: %v\n", message.RelayReplyMethod)
|
||||
//fmt.Printf(" * previous message.PreviousMessage.RelayReplyMethod: %v\n", message.PreviousMessage.RelayReplyMethod)
|
||||
|
||||
switch {
|
||||
case msgCopy.PreviousMessage.RelayReplyMethod == "":
|
||||
fmt.Printf("\n *DEBUG0.1 msgCopy: \n ")
|
||||
msgCopy.Method = msgCopy.PreviousMessage.ReplyMethod
|
||||
case msgCopy.PreviousMessage.RelayReplyMethod != "":
|
||||
msgCopy.Method = msgCopy.PreviousMessage.RelayReplyMethod
|
||||
}
|
||||
// fmt.Printf("\n *DEBUG1 msgCopy: %#v\n\n ", pretty.Formatter(msgCopy))
|
||||
|
||||
// Reset the previosMessage relay fields so the message don't loop.
|
||||
message.PreviousMessage.RelayViaNode = ""
|
||||
|
||||
// fmt.Printf("\n *DEBUG2 msgCopy: %#v\n\n ", pretty.Formatter(msgCopy))
|
||||
sam, err := newSubjectAndMessage(msgCopy)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: newSubjectAndMessage : %v, message copy: %v", err, msgCopy)
|
||||
|
@ -346,7 +365,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
|
|||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
fmt.Printf("\n * Created sam: %#v\n\n ", pretty.Formatter(sam))
|
||||
// fmt.Printf("\n *DEBUG3 Created sam: %#v\n\n ", pretty.Formatter(sam))
|
||||
p.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
}
|
||||
|
||||
|
|
|
@ -288,7 +288,14 @@ func newSubjectAndMessage(m Message) (subjectAndMessage, error) {
|
|||
|
||||
tmpH := mt.getHandler(m.Method)
|
||||
if tmpH == nil {
|
||||
return subjectAndMessage{}, fmt.Errorf("error: no such request type defined: %v", m.Method)
|
||||
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: no such request type defined: %v", m.Method)
|
||||
}
|
||||
|
||||
switch {
|
||||
case m.ToNode == "":
|
||||
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: ToNode empty: %+v", m)
|
||||
case m.Method == "":
|
||||
return subjectAndMessage{}, fmt.Errorf("error: newSubjectAndMessage: Method empty: %v", m)
|
||||
}
|
||||
|
||||
sub := Subject{
|
||||
|
|
13
requests.go
13
requests.go
|
@ -49,7 +49,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/hpcloud/tail"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
|
@ -1189,6 +1188,12 @@ func (m methodREQToConsole) getKind() CommandOrEvent {
|
|||
// Handler to write directly to console.
|
||||
func (m methodREQToConsole) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
||||
for _, v := range message.Data {
|
||||
fmt.Fprintf(os.Stdout, "%v", string(v))
|
||||
}
|
||||
|
||||
fmt.Println()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
@ -1541,11 +1546,11 @@ func (m methodREQRelay) getKind() CommandOrEvent {
|
|||
func (m methodREQRelay) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
// relay the message here to the actual host here.
|
||||
|
||||
fmt.Printf("\n * Got relay message: %#v\n\n ", pretty.Formatter(message))
|
||||
// fmt.Printf("\n * DEBUG Got relay message: %#v\n\n ", pretty.Formatter(message))
|
||||
|
||||
message.ToNode = message.RelayToNode
|
||||
message.FromNode = Node(node)
|
||||
fmt.Printf(" * THE VALUES OF, proc.configuration.NodeName: %v, node:%v\n ", proc.configuration.NodeName, node)
|
||||
// fmt.Printf(" * DEBUG THE VALUES OF, proc.configuration.NodeName: %v, node:%v\n ", proc.configuration.NodeName, node)
|
||||
message.Method = message.RelayOriginalMethod
|
||||
|
||||
sam, err := newSubjectAndMessage(message)
|
||||
|
@ -1555,7 +1560,7 @@ func (m methodREQRelay) handler(proc process, message Message, node string) ([]b
|
|||
log.Printf("%v\n", er)
|
||||
}
|
||||
|
||||
fmt.Printf("\n * Created sam: %#v\n\n ", pretty.Formatter(sam))
|
||||
// fmt.Printf("\n * DEBUG: Created sam: %#v\n\n ", pretty.Formatter(sam))
|
||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||
|
||||
// Send back an ACK message.
|
||||
|
|
Loading…
Reference in a new issue