From 8db29c7e2fae566a70b842ba1569f25054923412 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 5 Feb 2021 10:47:07 +0100 Subject: [PATCH] implemented the concept of an error kernel --- README.md | 11 +++- getmessagefromfile.go | 12 +++-- publisher.go | 122 ++++++++++++++++++------------------------ 3 files changed, 72 insertions(+), 73 deletions(-) diff --git a/README.md b/README.md index e873734..aca64cb 100644 --- a/README.md +++ b/README.md @@ -24,9 +24,10 @@ The idea is to build and use a pure message passing architecture for the control │ │ └─────────────────┘ ``` + Why ? -With existing solutions there is often either a push or a pull kind of setup. +With existing solutions there is often either a push or a pull kind of setup. In a push setup the commands to execute is pushed to the receiver, but if a command fails because for example a broken network link it is up to you as an administrator to detect those failures and retry them at a later time until it is executed successfully. @@ -74,3 +75,11 @@ For syslog of type event to a host named "ship1" and for a shell command of type command to a host named "ship2" `ship2.command.shellcommand.operatingsystem` + +## TODO + +- Timeouts. Does it makes sense to have a default timeout for all messages, and where that timeout can be overridden per message upon creation of the message. + +- Check that there is a node for the specific message new incomming message, and the supervisor should create the process with the wanted subject on both the publishing and the receiving node. If there is no such node an error should be generated and processed by the error-kernel. + +- Since a process will be locked while waiting to send the error on the errorCh maybe it makes sense to have a channel inside the processes error handling with a select so we can send back to the process if it should continue or not based not based on how severe the error where. This should be right after sending the error sending in the process. \ No newline at end of file diff --git a/getmessagefromfile.go b/getmessagefromfile.go index 45e9ba6..59abba8 100644 --- a/getmessagefromfile.go +++ b/getmessagefromfile.go @@ -26,14 +26,20 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh log.Printf("error: reading file: %v", err) } + // Start on top again if the file did not contain + // any data. + if len(b) == 0 { + continue + } + // unmarshal the JSON into a struct js, err := jsonFromFileData(b) if err != nil { log.Printf("%v\n", err) } - for i, _ := range js { - fmt.Printf("messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType) + for i := range js { + fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType) } // Send the data back to be consumed @@ -111,7 +117,7 @@ func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) { select { case event := <-watcher.Events: if event.Op&fsnotify.Write == fsnotify.Write { - log.Println("modified file:", event.Name) + log.Println("info: infile updated, processing input: ", event.Name) //testing with an update chan to get updates fileUpdated <- true } diff --git a/publisher.go b/publisher.go index a58a0a2..6f57b2f 100644 --- a/publisher.go +++ b/publisher.go @@ -60,6 +60,9 @@ type server struct { // The channel where we receive new messages from the outside to // insert into the system for being processed newMessagesCh chan []jsonFromFile + // errorCh is used to report errors from a process + // NB: Implementing this as an int to report for testing + errorCh chan string } // newServer will prepare and return a server type @@ -74,6 +77,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { natsConn: conn, processes: make(map[subjectName]process), newMessagesCh: make(chan []jsonFromFile), + errorCh: make(chan string, 10), } return s, nil @@ -82,41 +86,15 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { func (s *server) PublisherStart() { // Start the error handler - // TODO: For now it will just print the error messages to the - // console. - go func() { + s.startErrorKernel() - for { - for k := range s.processes { - select { - case e := <-s.processes[k].errorCh: - fmt.Printf("*** %v\n", e) - default: - time.Sleep(time.Millisecond * 100) - } - - } - } - }() - - // start the checking of files for input messages + // Start the checking the input file for new messages from operator. go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) - // TODO: For now we just print content of the files read. - // Replace this with a broker function that will know how - // send it on to the correct publisher. - // go func() { - // for v := range s.newMessagesCh { - // // Check if there are new content read from file input - // fmt.Printf("received: %#v\n", v) - // - // } - // }() - // Prepare and start a single process { sub := newSubject("ship1", "command", "shellcommand", "shell") - proc := s.processPrepareNew(sub) + proc := s.processPrepareNew(sub, s.errorCh) // fmt.Printf("*** %#v\n", proc) go s.processSpawn(proc) } @@ -124,25 +102,62 @@ func (s *server) PublisherStart() { // Prepare and start a single process { sub := newSubject("ship2", "command", "shellcommand", "shell") - proc := s.processPrepareNew(sub) + proc := s.processPrepareNew(sub, s.errorCh) // fmt.Printf("*** %#v\n", proc) go s.processSpawn(proc) } - // Simulate generating some commands to be sent as messages to nodes. + s.handleNewOperatorMessages() + + select {} + +} + +// startErrorKernel will start the error kernel and check if there +// have been reveived any errors from any of the processes, and +// handle them appropriately. +// TODO: Since a process will be locked while waiting to send the error +// on the errorCh maybe it makes sense to have a channel inside the +// processes error handling with a select so we can send back to the +// process if it should continue or not based not based on how severe +// the error where. This should be right after sending the error +// sending in the process. +func (s *server) startErrorKernel() { + // TODO: For now it will just print the error messages to the + // console. + go func() { + + for { + e := <-s.errorCh + log.Printf("*** ERROR_KERNEL: %#v, type=%T\n", e, e) + } + }() +} + +// handleNewOperatorMessages will handle all the new operator messages +// given to the system, and route them to the correct subject queue. +func (s *server) handleNewOperatorMessages() { + // Process the messages that have been received on the incomming + // message pipe. Check and send if there are a specific subject + // for it, and no subject exist throw an error. + // + // TODO: Later on the only thing that should be checked here is + // that there is a node for the specific message, and the super- + // visor should create the process with the wanted subject on both + // the publishing and the receiving node. If there is no such node + // an error should be generated and processed by the error-kernel. go func() { for v := range s.newMessagesCh { for _, vv := range v { m := vv.Message subjName := vv.Subject.name() - fmt.Printf("** message: %v, ** subject: %v\n", m, vv.Subject) + fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, vv.Subject) _, ok := s.processes[subjName] if ok { log.Printf("info: found the specific subject: %v\n", subjName) - fmt.Printf("* Before Putting incomming message on subject.messageCh\n") + // Put the message on the correct process's messageCh s.processes[subjName].subject.messageCh <- m - fmt.Printf("* After Putting incomming message on subject.messageCh\n") } else { log.Printf("info: did not find that specific subject: %v\n", subjName) time.Sleep(time.Millisecond * 500) @@ -151,27 +166,6 @@ func (s *server) PublisherStart() { } } }() - - // // Simulate generating some commands to be sent as messages to nodes. - // go func() { - // for { - // m := Message{ - // Data: []string{"bash", "-c", "uname -a"}, - // MessageType: Event, - // } - // subjName := subjectName("btship2.command.shellcommand.shell") - // _, ok := s.processes[subjName] - // if ok { - // s.processes[subjName].subject.messageCh <- m - // } else { - // time.Sleep(time.Millisecond * 500) - // continue - // } - // } - // }() - - select {} - } type node string @@ -228,14 +222,11 @@ type process struct { // errorCh is used to report errors from a process // NB: Implementing this as an int to report for testing errorCh chan string - // messageCh are the channel where we put the message we want - // a process to send - //messageCh chan Message } // prepareNewProcess will set the the provided values and the default // values for a process. -func (s *server) processPrepareNew(subject Subject) process { +func (s *server) processPrepareNew(subject Subject, errCh chan string) process { // create the initial configuration for a sessions communicating with 1 host process. s.lastProcessID++ proc := process{ @@ -243,7 +234,7 @@ func (s *server) processPrepareNew(subject Subject) process { subject: subject, node: node(subject.Node), processID: s.lastProcessID, - errorCh: make(chan string), + errorCh: errCh, //messageCh: make(chan Message), } @@ -261,9 +252,6 @@ func (s *server) processSpawn(proc process) { s.processes[proc.subject.name()] = proc s.mu.Unlock() - // Loop creating one new message every second to simulate getting new - // messages to deliver. - // // TODO: I think it makes most sense that the messages would come to // here from some other message-pickup-process, and that process will // give the message to the correct publisher process. A channel that @@ -271,9 +259,7 @@ func (s *server) processSpawn(proc process) { // messages from the message-pickup-process. for { // Wait and read the next message on the message channel - fmt.Printf("* Before checking messageCh inside process\n") m := <-proc.subject.messageCh - fmt.Printf("* After checking messageCh inside process: %v\n", m) m.ID = s.processes[proc.subject.name()].messageID messageDeliver(proc, m, s.natsConn) @@ -284,9 +270,7 @@ func (s *server) processSpawn(proc process) { // NB: simulate that we get an error, and that we can send that // out of the process and receive it in another thread. - s.processes[proc.subject.name()].errorCh <- "received an error from process: " + fmt.Sprintf("ID=%v, subjectName=%v\n", proc.processID, proc.subject.name()) - - //fmt.Printf("%#v\n", s.processes[proc.node]) + s.errorCh <- "received an error from process: " + fmt.Sprintf("ID=%v, subjectName=%v\n", proc.processID, proc.subject.name()) } } @@ -332,7 +316,7 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) { // did not receive a reply, continuing from top again continue } - fmt.Printf("publisher: received: %s\n", msgReply.Data) + log.Printf("publisher: received ACK: %s\n", msgReply.Data) return } }