From 271447d42dbc0654a545b186d120af8b5de8ad7b Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 10 Feb 2021 05:11:48 +0100 Subject: [PATCH] first concept of receiving messages from end nodes --- .gitignore | 3 +++ cmd/main.go | 15 ++++++------- example-inmessage/orig-central.json | 15 +++++++++++++ example-inmessage/orig-ship1.json | 2 +- example-inmessage/orig-ship2.json | 2 +- publisher.go | 33 +++++++++++++++++++++++++++-- subscriber.go | 8 +++++-- textlogging.log | 1 + 8 files changed, 65 insertions(+), 14 deletions(-) create mode 100644 .gitignore create mode 100644 example-inmessage/orig-central.json create mode 100644 textlogging.log diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6490996 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +textlogging.go +ship1/ +ship2/ diff --git a/cmd/main.go b/cmd/main.go index 1ba4867..d34cc81 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -14,8 +14,8 @@ import ( func main() { nodeName := flag.String("node", "0", "some unique string to identify this Edge unit") brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker") - modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish") - modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe") + // modePublisher := flag.Bool("modePublisher", false, "set to true if it should be able to publish") + // modeSubscriber := flag.Bool("modeSubscriber", false, "set to true if it should be able to subscribe") profilingPort := flag.String("profilingPort", "", "The number of the profiling port") flag.Parse() @@ -34,13 +34,12 @@ func main() { os.Exit(1) } - if *modePublisher { - go s.PublisherStart() - } + // Start the messaging server + go s.Start() - if *modeSubscriber { - go s.RunSubscriber() - } + //if *modeSubscriber { + // go s.RunSubscriber() + //} select {} } diff --git a/example-inmessage/orig-central.json b/example-inmessage/orig-central.json new file mode 100644 index 0000000..d2e919a --- /dev/null +++ b/example-inmessage/orig-central.json @@ -0,0 +1,15 @@ +[ + { + "subject": + { + "node":"central", + "messageKind":"event", + "method":"textlogging" + }, + "message": + { + "data": ["some message sent from a ship"], + "messageType":"Event" + } + } +] \ No newline at end of file diff --git a/example-inmessage/orig-ship1.json b/example-inmessage/orig-ship1.json index 3b42c32..d9577e4 100644 --- a/example-inmessage/orig-ship1.json +++ b/example-inmessage/orig-ship1.json @@ -8,7 +8,7 @@ }, "message": { - "data": ["bash","-c","ls -l"], + "data": ["bash","-c","ls -l ../"], "messageType":"Command" } } diff --git a/example-inmessage/orig-ship2.json b/example-inmessage/orig-ship2.json index 4a0c452..3ed4a8f 100644 --- a/example-inmessage/orig-ship2.json +++ b/example-inmessage/orig-ship2.json @@ -8,7 +8,7 @@ }, "message": { - "data": ["bash","-c","tree"], + "data": ["bash","-c","tree ../"], "messageType":"Command" } } diff --git a/publisher.go b/publisher.go index 8c18fbf..0ae2091 100644 --- a/publisher.go +++ b/publisher.go @@ -72,6 +72,8 @@ type server struct { errorCh chan errProcess // errorKernel errorKernel *errorKernel + // TODO: replace this with some structure to hold the logCh value + logCh chan []byte } // newServer will prepare and return a server type @@ -87,6 +89,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { processes: make(map[subjectName]process), newMessagesCh: make(chan []jsonFromFile), errorCh: make(chan errProcess, 2), + logCh: make(chan []byte), } // Start the error kernel that will do all the error handling @@ -98,10 +101,36 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { } -func (s *server) PublisherStart() { +func (s *server) Start() { // Start the checking the input file for new messages from operator. go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) + // Start the textlogging service that will run on the subscribers + // TODO: Figure out how to structure event services like these + + go s.startTextLogging(s.logCh) + + // Start a subscriber for shellCommand messages + { + fmt.Printf("nodeName: %#v\n", s.nodeName) + sub := newSubject(s.nodeName, "command", "shellcommand") + proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber) + // fmt.Printf("*** %#v\n", proc) + go s.processSpawnWorker(proc) + } + + // Start a subscriber for textLogging messages + { + fmt.Printf("nodeName: %#v\n", s.nodeName) + sub := newSubject(s.nodeName, "event", "textlogging") + proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber) + // fmt.Printf("*** %#v\n", proc) + go s.processSpawnWorker(proc) + } + + time.Sleep(time.Second * 2) + fmt.Printf("*** Output of processes map: %#v\n", s.processes) + // Prepare and start a single process //{ // sub := newSubject("ship1", "command", "shellcommand") @@ -306,7 +335,7 @@ func (s *server) processSpawnWorker(proc process) { // We start one handler per message received by using go routines here. // This is for being able to reply back the current publisher who sent // the message. - go handler(s.natsConn, s.nodeName, msg) + go s.handler(s.natsConn, s.nodeName, msg) }) if err != nil { log.Printf("error: Subscribe failed: %v\n", err) diff --git a/subscriber.go b/subscriber.go index b14beaa..0107183 100644 --- a/subscriber.go +++ b/subscriber.go @@ -48,7 +48,7 @@ func (s *server) RunSubscriber() { // the state of the message being processed, and then reply back to the // correct sending process's reply, meaning so we ACK back to the correct // publisher. -func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { +func (s *server) handler(natsConn *nats.Conn, node string, msg *nats.Msg) { message := Message{} @@ -84,7 +84,11 @@ func handler(natsConn *nats.Conn, node string, msg *nats.Msg) { // Send a confirmation message back to the publisher natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprintf("%v\n%s", message.ID, out))) case message.MessageType == "Event": - fmt.Printf("info: the event type is not implemented yet\n") + fmt.Printf("info: sending over the message %#v\n", message) + + for _, d := range message.Data { + s.logCh <- []byte(d) + } // Send a confirmation message back to the publisher natsConn.Publish(msg.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) diff --git a/textlogging.log b/textlogging.log new file mode 100644 index 0000000..86f1046 --- /dev/null +++ b/textlogging.log @@ -0,0 +1 @@ +some message sent from a shipsome message sent from a ship \ No newline at end of file