diff --git a/getmessagefromfile.go b/getmessagefromfile.go index 2568b2d..db95c6a 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -14,7 +14,7 @@ import ( // getMessagesFromFile will start a file watcher for the given directory // and filename. It will take a channel of []byte as input, and it is // in this channel the content of a file that has changed is returned. -func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) { +func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []jsonFromFile) { fileUpdated := make(chan bool) go fileWatcherStart(directoryToCheck, fileUpdated) @@ -40,6 +40,7 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh for i := range js { fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageKind, js[i].Subject.MessageKind) + js[i].Message.FromNode = node(s.nodeName) } // Send the data back to be consumed diff --git a/publisher.go b/publisher.go index d7a0755..8c18fbf 100644 --- a/publisher.go +++ b/publisher.go @@ -50,6 +50,7 @@ type Message struct { Data []string `json:"data" yaml:"data"` // The type of the message being sent MessageType MessageKind `json:"messageType" yaml:"messageType"` + FromNode node } // server is the structure that will hold the state about spawned @@ -99,7 +100,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { func (s *server) PublisherStart() { // Start the checking the input file for new messages from operator. - go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) + go s.getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) // Prepare and start a single process //{ @@ -138,6 +139,10 @@ func (s *server) handleNewOperatorMessages() { go func() { for v := range s.newMessagesCh { for i, vv := range v { + + // Adding a label here so we are able to redo the sending + // of the last message if a process with specified subject + // is not present. redo: m := vv.Message subjName := vv.Subject.name()