diff --git a/cmd/main.go b/cmd/main.go index d34cc81..508aa89 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -37,9 +37,5 @@ func main() { // Start the messaging server go s.Start() - //if *modeSubscriber { - // go s.RunSubscriber() - //} - select {} } diff --git a/example-inmessage/orig-central.json b/example-inmessage/orig-central.json index d2e919a..db20982 100644 --- a/example-inmessage/orig-central.json +++ b/example-inmessage/orig-central.json @@ -3,13 +3,13 @@ "subject": { "node":"central", - "messageKind":"event", + "commandOrEvent":"event", "method":"textlogging" }, "message": { "data": ["some message sent from a ship"], - "messageType":"Event" + "commandOrEvent":"Event" } } ] \ No newline at end of file diff --git a/example-inmessage/orig-ship1.json b/example-inmessage/orig-ship1.json index d9577e4..1c9d8da 100644 --- a/example-inmessage/orig-ship1.json +++ b/example-inmessage/orig-ship1.json @@ -3,13 +3,13 @@ "subject": { "node":"ship1", - "messageKind":"command", + "commandOrEvent":"command", "method":"shellcommand" }, "message": { "data": ["bash","-c","ls -l ../"], - "messageType":"Command" + "commandOrEvent":"Command" } } ] \ No newline at end of file diff --git a/example-inmessage/orig-ship2.json b/example-inmessage/orig-ship2.json index 3ed4a8f..0728d96 100644 --- a/example-inmessage/orig-ship2.json +++ b/example-inmessage/orig-ship2.json @@ -3,13 +3,13 @@ "subject": { "node":"ship2", - "messageKind":"command", + "commandOrEvent":"command", "method":"shellcommand" }, "message": { "data": ["bash","-c","tree ../"], - "messageType":"Command" + "commandOrEvent":"Command" } } ] \ No newline at end of file diff --git a/getmessagefromfile.go b/getmessagefromfile.go index db95c6a..be43f15 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -39,7 +39,7 @@ func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, f } for i := range js { - fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind) + fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.CommandOrEvent, js[i].Subject.CommandOrEvent) js[i].Message.FromNode = node(s.nodeName) } diff --git a/publisher.go b/publisher.go index 0ae2091..5e6b97f 100644 --- a/publisher.go +++ b/publisher.go @@ -6,19 +6,23 @@ import ( "encoding/gob" "fmt" "log" + "os/exec" "sync" "time" "github.com/nats-io/nats.go" ) -// MessageKind describes on the message level if this is +// CommandOrEvent describes on the message level if this is // an event or command kind of message in the Subject name. // This field is mainly used to be able to spawn up different // worker processes based on the Subject name so we can have // one process for handling event kind, and another for // handling command kind of messages. -type MessageKind string +// This type is used in both building the subject name, and +// also inside the Message type to describe if it is a Command +// or Event. +type CommandOrEvent string // TODO: Figure it makes sense to have these types at all. // It might make more sense to implement these as two @@ -29,13 +33,13 @@ const ( // delivered back in the reply ack message. // The message should contain the unique ID of the // command. - Command MessageKind = "command" + Command CommandOrEvent = "command" // shellCommand, wait for and return the output // of the command in the ACK message. This means // that the command should be executed immediately // and that we should get the confirmation that it // was successful or not. - Event MessageKind = "event" + Event CommandOrEvent = "event" // eventCommand, just wait for the ACK that the // message is received. What action happens on the // receiving side is up to the received to decide. @@ -49,8 +53,8 @@ type Message struct { // interface type here to handle several data types ? Data []string `json:"data" yaml:"data"` // The type of the message being sent - MessageType MessageKind `json:"messageType" yaml:"messageType"` - FromNode node + CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"` + FromNode node } // server is the structure that will hold the state about spawned @@ -101,13 +105,16 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { } +// Start will spawn up all the defined subscriber processes. +// Spawning of publisher processes is done on the fly by checking +// if there is publisher process for a given message subject. This +// checking is also started here in Start by calling handleMessagesToPublish. func (s *server) Start() { // Start the checking the input file for new messages from operator. go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) // Start the textlogging service that will run on the subscribers // TODO: Figure out how to structure event services like these - go s.startTextLogging(s.logCh) // Start a subscriber for shellCommand messages @@ -129,33 +136,26 @@ func (s *server) Start() { } time.Sleep(time.Second * 2) - fmt.Printf("*** Output of processes map: %#v\n", s.processes) + s.printProcessesMap() - // Prepare and start a single process - //{ - // sub := newSubject("ship1", "command", "shellcommand") - // proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) - // // fmt.Printf("*** %#v\n", proc) - // go s.processSpawnWorker(proc) - //} - - // Prepare and start a single process - // { - // sub := newSubject("ship2", "command", "shellcommand") - // proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) - // // fmt.Printf("*** %#v\n", proc) - // go s.processSpawnWorker(proc) - // } - - s.handleNewOperatorMessages() + s.handleMessagesToPublish() select {} } +func (s *server) printProcessesMap() { + fmt.Println("--------------------------------------------------------------------------------------------") + fmt.Printf("*** Output of processes map :\n") + for _, v := range s.processes { + fmt.Printf("*** - : %v\n", v) + } + fmt.Println("--------------------------------------------------------------------------------------------") +} + // handleNewOperatorMessages will handle all the new operator messages // given to the system, and route them to the correct subject queue. -func (s *server) handleNewOperatorMessages() { +func (s *server) handleMessagesToPublish() { // Process the messages that have been received on the incomming // message pipe. Check and send if there are a specific subject // for it, and no subject exist throw an error. @@ -185,12 +185,13 @@ func (s *server) handleNewOperatorMessages() { // If a publisher do not exist for the given subject, create it. log.Printf("info: did not find that specific subject, starting new process for subject: %v\n", subjName) - sub := newSubject(v[i].Subject.Node, v[i].Subject.MessageKind, v[i].Subject.Method) + sub := newSubject(v[i].Subject.Node, v[i].Subject.CommandOrEvent, v[i].Subject.Method) proc := s.processPrepareNew(sub, s.errorCh, processKindPublisher) // fmt.Printf("*** %#v\n", proc) go s.processSpawnWorker(proc) time.Sleep(time.Millisecond * 500) + s.printProcessesMap() goto redo } } @@ -206,7 +207,7 @@ type Subject struct { // node, the name of the node Node string `json:"node" yaml:"node"` // messageType, command/event - MessageKind MessageKind `json:"messageKind" yaml:"messageKind"` + CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"` // method, what is this message doing, etc. shellcommand, syslog, etc. Method string `json:"method" yaml:"method"` // messageCh is the channel for receiving new content to be sent @@ -216,12 +217,12 @@ type Subject struct { // newSubject will return a new variable of the type subject, and insert // all the values given as arguments. It will also create the channel // to receive new messages on the specific subject. -func newSubject(node string, messageKind MessageKind, method string) Subject { +func newSubject(node string, commandOrEvent CommandOrEvent, method string) Subject { return Subject{ - Node: node, - MessageKind: messageKind, - Method: method, - messageCh: make(chan Message), + Node: node, + CommandOrEvent: commandOrEvent, + Method: method, + messageCh: make(chan Message), } } @@ -229,7 +230,7 @@ func newSubject(node string, messageKind MessageKind, method string) Subject { type subjectName string func (s Subject) name() subjectName { - return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.MessageKind, s.Method)) + return subjectName(fmt.Sprintf("%s.%s.%s", s.Node, s.CommandOrEvent, s.Method)) } // processKind are either kindSubscriber or kindPublisher, and are @@ -294,6 +295,8 @@ func (s *server) processSpawnWorker(proc process) { // give the message to the correct publisher process. A channel that // is listened on in the for loop below could be used to receive the // messages from the message-pickup-process. + // + // Handle publisher workers if proc.processKind == processKindPublisher { for { // Wait and read the next message on the message channel @@ -325,6 +328,7 @@ func (s *server) processSpawnWorker(proc process) { } } + // handle subscriber workers if proc.processKind == processKindSubscriber { //subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") subject := string(proc.subject.name()) @@ -335,7 +339,7 @@ func (s *server) processSpawnWorker(proc process) { // We start one handler per message received by using go routines here. // This is for being able to reply back the current publisher who sent // the message. - go s.handler(s.natsConn, s.nodeName, msg) + go s.subscriberHandler(s.natsConn, s.nodeName, msg) }) if err != nil { log.Printf("error: Subscribe failed: %v\n", err) @@ -403,3 +407,62 @@ func gobEncodePayload(m Message) ([]byte, error) { return buf.Bytes(), nil } + +// handler will deserialize the message when a new message is received, +// check the MessageType field in the message to decide what kind of +// message it is and then it will check how to handle that message type, +// and handle it. +// This handler function should be started in it's own go routine,so +// one individual handler is started per message received so we can keep +// the state of the message being processed, and then reply back to the +// correct sending process's reply, meaning so we ACK back to the correct +// publisher. +func (s *server) subscriberHandler(natsConn *nats.Conn, node string, msg *nats.Msg) { + + message := Message{} + + // Create a buffer to decode the gob encoded binary data back + // to it's original structure. + buf := bytes.NewBuffer(msg.Data) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + log.Printf("error: gob decoding failed: %v\n", err) + } + + //fmt.Printf("%v\n", msg) + // TODO: Maybe the handling of the errors within the subscriber + // should also involve the error-kernel to report back centrally + // that there was a problem like missing method to handle a specific + // method etc. + switch { + case message.CommandOrEvent == "Command": + + // Since the command to execute is at the first position in the + // slice we need to slice it out. The arguments are at the + // remaining positions. + c := message.Data[0] + a := message.Data[1:] + cmd := exec.Command(c, a...) + //cmd.Stdout = os.Stdout + out, err := cmd.CombinedOutput() + if err != nil { + log.Printf("error: execution of command failed: %v\n", err) + } + fmt.Printf("%s", out) + + // Send a confirmation message back to the publisher + natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out))) + case message.CommandOrEvent == "Event": + fmt.Printf("info: sending over the message %#v\n", message) + + for _, d := range message.Data { + s.logCh <- []byte(d) + } + + // Send a confirmation message back to the publisher + natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) + default: + log.Printf("info: did not find that specific type of command: %#v\n", message.CommandOrEvent) + } +} diff --git a/subscriber.go b/subscriber.go index 0107183..5fd4681 100644 --- a/subscriber.go +++ b/subscriber.go @@ -1,98 +1 @@ package steward - -import ( - "bytes" - "encoding/gob" - "fmt" - "log" - "os/exec" - - "github.com/nats-io/nats.go" -) - -// RunSubscriber will start a subscribing process. -// TODO: Right now the only thing a subscriber can do is ro receive commands, -// check if there are more things a subscriber should be able to do. -func (s *server) RunSubscriber() { - { - fmt.Printf("nodeName: %#v\n", s.nodeName) - sub := newSubject(s.nodeName, "command", "shellcommand") - proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber) - // fmt.Printf("*** %#v\n", proc) - go s.processSpawnWorker(proc) - } - - // subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") - // - // // Subscribe will start up a Go routine under the hood calling the - // // callback function specified when a new message is received. - // _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { - // // We start one handler per message received by using go routines here. - // // This is for being able to reply back the current publisher who sent - // // the message. - // go handler(s.natsConn, s.nodeName, msg) - // }) - // if err != nil { - // log.Printf("error: Subscribe failed: %v\n", err) - // } - - select {} -} - -// handler will deserialize the message when a new message is received, -// check the MessageType field in the message to decide what kind of -// message it and then it will check how to handle that message type, -// and handle it. -// This handler function should be started in it's own go routine,so -// one individual handler is started per message received so we can keep -// the state of the message being processed, and then reply back to the -// correct sending process's reply, meaning so we ACK back to the correct -// publisher. -func (s *server) handler(natsConn *nats.Conn, node string, msg *nats.Msg) { - - message := Message{} - - // Create a buffer to decode the gob encoded binary data back - // to it's original structure. - buf := bytes.NewBuffer(msg.Data) - gobDec := gob.NewDecoder(buf) - err := gobDec.Decode(&message) - if err != nil { - log.Printf("error: gob decoding failed: %v\n", err) - } - - //fmt.Printf("%v\n", msg) - // TODO: Maybe the handling of the errors within the subscriber - // should also involve the error-kernel to report back centrally - // that there was a problem like missing method to handle a specific - // method etc. - switch { - case message.MessageType == "Command": - // Since the command to execute is at the first position in the - // slice we need to slice it out. The arguments are at the - // remaining positions. - c := message.Data[0] - a := message.Data[1:] - cmd := exec.Command(c, a...) - //cmd.Stdout = os.Stdout - out, err := cmd.CombinedOutput() - if err != nil { - log.Printf("error: execution of command failed: %v\n", err) - } - fmt.Printf("%s", out) - - // Send a confirmation message back to the publisher - natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out))) - case message.MessageType == "Event": - fmt.Printf("info: sending over the message %#v\n", message) - - for _, d := range message.Data { - s.logCh <- []byte(d) - } - - // Send a confirmation message back to the publisher - natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) - default: - log.Printf("info: did not find that specific type of command: %#v\n", message.MessageType) - } -} diff --git a/textlogging.log b/textlogging.log index 86f1046..16ebf48 100644 --- a/textlogging.log +++ b/textlogging.log @@ -1 +1 @@ -some message sent from a shipsome message sent from a ship \ No newline at end of file +some message sent from a shipsome message sent from a shipsome message sent from a shipsome message sent from a ship \ No newline at end of file