1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 04:49:17 +00:00

implemented the concept of an error kernel

This commit is contained in:
postmannen 2021-02-05 10:47:07 +01:00
parent 7d5b2bccb9
commit 8db29c7e2f
3 changed files with 72 additions and 73 deletions

View file

@ -24,6 +24,7 @@ The idea is to build and use a pure message passing architecture for the control
│ │ │ │
└─────────────────┘ └─────────────────┘
``` ```
Why ? Why ?
With existing solutions there is often either a push or a pull kind of setup. With existing solutions there is often either a push or a pull kind of setup.
@ -74,3 +75,11 @@ For syslog of type event to a host named "ship1"
and for a shell command of type command to a host named "ship2" and for a shell command of type command to a host named "ship2"
`ship2.command.shellcommand.operatingsystem` `ship2.command.shellcommand.operatingsystem`
## TODO
- Timeouts. Does it makes sense to have a default timeout for all messages, and where that timeout can be overridden per message upon creation of the message.
- Check that there is a node for the specific message new incomming message, and the supervisor should create the process with the wanted subject on both the publishing and the receiving node. If there is no such node an error should be generated and processed by the error-kernel.
- Since a process will be locked while waiting to send the error on the errorCh maybe it makes sense to have a channel inside the processes error handling with a select so we can send back to the process if it should continue or not based not based on how severe the error where. This should be right after sending the error sending in the process.

View file

@ -26,14 +26,20 @@ func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh
log.Printf("error: reading file: %v", err) log.Printf("error: reading file: %v", err)
} }
// Start on top again if the file did not contain
// any data.
if len(b) == 0 {
continue
}
// unmarshal the JSON into a struct // unmarshal the JSON into a struct
js, err := jsonFromFileData(b) js, err := jsonFromFileData(b)
if err != nil { if err != nil {
log.Printf("%v\n", err) log.Printf("%v\n", err)
} }
for i, _ := range js { for i := range js {
fmt.Printf("messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType) fmt.Printf("*** Checking message found in file: messageType type: %T, messagetype contains: %#v\n", js[i].Subject.MessageType, js[i].Subject.MessageType)
} }
// Send the data back to be consumed // Send the data back to be consumed
@ -111,7 +117,7 @@ func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) {
select { select {
case event := <-watcher.Events: case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write { if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("modified file:", event.Name) log.Println("info: infile updated, processing input: ", event.Name)
//testing with an update chan to get updates //testing with an update chan to get updates
fileUpdated <- true fileUpdated <- true
} }

View file

@ -60,6 +60,9 @@ type server struct {
// The channel where we receive new messages from the outside to // The channel where we receive new messages from the outside to
// insert into the system for being processed // insert into the system for being processed
newMessagesCh chan []jsonFromFile newMessagesCh chan []jsonFromFile
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan string
} }
// newServer will prepare and return a server type // newServer will prepare and return a server type
@ -74,6 +77,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
natsConn: conn, natsConn: conn,
processes: make(map[subjectName]process), processes: make(map[subjectName]process),
newMessagesCh: make(chan []jsonFromFile), newMessagesCh: make(chan []jsonFromFile),
errorCh: make(chan string, 10),
} }
return s, nil return s, nil
@ -82,41 +86,15 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
func (s *server) PublisherStart() { func (s *server) PublisherStart() {
// Start the error handler // Start the error handler
// TODO: For now it will just print the error messages to the s.startErrorKernel()
// console.
go func() {
for { // Start the checking the input file for new messages from operator.
for k := range s.processes {
select {
case e := <-s.processes[k].errorCh:
fmt.Printf("*** %v\n", e)
default:
time.Sleep(time.Millisecond * 100)
}
}
}
}()
// start the checking of files for input messages
go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh) go getMessagesFromFile("./", "inmsg.txt", s.newMessagesCh)
// TODO: For now we just print content of the files read.
// Replace this with a broker function that will know how
// send it on to the correct publisher.
// go func() {
// for v := range s.newMessagesCh {
// // Check if there are new content read from file input
// fmt.Printf("received: %#v\n", v)
//
// }
// }()
// Prepare and start a single process // Prepare and start a single process
{ {
sub := newSubject("ship1", "command", "shellcommand", "shell") sub := newSubject("ship1", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub) proc := s.processPrepareNew(sub, s.errorCh)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc) go s.processSpawn(proc)
} }
@ -124,25 +102,62 @@ func (s *server) PublisherStart() {
// Prepare and start a single process // Prepare and start a single process
{ {
sub := newSubject("ship2", "command", "shellcommand", "shell") sub := newSubject("ship2", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub) proc := s.processPrepareNew(sub, s.errorCh)
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc) go s.processSpawn(proc)
} }
// Simulate generating some commands to be sent as messages to nodes. s.handleNewOperatorMessages()
select {}
}
// startErrorKernel will start the error kernel and check if there
// have been reveived any errors from any of the processes, and
// handle them appropriately.
// TODO: Since a process will be locked while waiting to send the error
// on the errorCh maybe it makes sense to have a channel inside the
// processes error handling with a select so we can send back to the
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (s *server) startErrorKernel() {
// TODO: For now it will just print the error messages to the
// console.
go func() {
for {
e := <-s.errorCh
log.Printf("*** ERROR_KERNEL: %#v, type=%T\n", e, e)
}
}()
}
// handleNewOperatorMessages will handle all the new operator messages
// given to the system, and route them to the correct subject queue.
func (s *server) handleNewOperatorMessages() {
// Process the messages that have been received on the incomming
// message pipe. Check and send if there are a specific subject
// for it, and no subject exist throw an error.
//
// TODO: Later on the only thing that should be checked here is
// that there is a node for the specific message, and the super-
// visor should create the process with the wanted subject on both
// the publishing and the receiving node. If there is no such node
// an error should be generated and processed by the error-kernel.
go func() { go func() {
for v := range s.newMessagesCh { for v := range s.newMessagesCh {
for _, vv := range v { for _, vv := range v {
m := vv.Message m := vv.Message
subjName := vv.Subject.name() subjName := vv.Subject.name()
fmt.Printf("** message: %v, ** subject: %v\n", m, vv.Subject) fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, vv.Subject)
_, ok := s.processes[subjName] _, ok := s.processes[subjName]
if ok { if ok {
log.Printf("info: found the specific subject: %v\n", subjName) log.Printf("info: found the specific subject: %v\n", subjName)
fmt.Printf("* Before Putting incomming message on subject.messageCh\n") // Put the message on the correct process's messageCh
s.processes[subjName].subject.messageCh <- m s.processes[subjName].subject.messageCh <- m
fmt.Printf("* After Putting incomming message on subject.messageCh\n")
} else { } else {
log.Printf("info: did not find that specific subject: %v\n", subjName) log.Printf("info: did not find that specific subject: %v\n", subjName)
time.Sleep(time.Millisecond * 500) time.Sleep(time.Millisecond * 500)
@ -151,27 +166,6 @@ func (s *server) PublisherStart() {
} }
} }
}() }()
// // Simulate generating some commands to be sent as messages to nodes.
// go func() {
// for {
// m := Message{
// Data: []string{"bash", "-c", "uname -a"},
// MessageType: Event,
// }
// subjName := subjectName("btship2.command.shellcommand.shell")
// _, ok := s.processes[subjName]
// if ok {
// s.processes[subjName].subject.messageCh <- m
// } else {
// time.Sleep(time.Millisecond * 500)
// continue
// }
// }
// }()
select {}
} }
type node string type node string
@ -228,14 +222,11 @@ type process struct {
// errorCh is used to report errors from a process // errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing // NB: Implementing this as an int to report for testing
errorCh chan string errorCh chan string
// messageCh are the channel where we put the message we want
// a process to send
//messageCh chan Message
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
// values for a process. // values for a process.
func (s *server) processPrepareNew(subject Subject) process { func (s *server) processPrepareNew(subject Subject, errCh chan string) process {
// create the initial configuration for a sessions communicating with 1 host process. // create the initial configuration for a sessions communicating with 1 host process.
s.lastProcessID++ s.lastProcessID++
proc := process{ proc := process{
@ -243,7 +234,7 @@ func (s *server) processPrepareNew(subject Subject) process {
subject: subject, subject: subject,
node: node(subject.Node), node: node(subject.Node),
processID: s.lastProcessID, processID: s.lastProcessID,
errorCh: make(chan string), errorCh: errCh,
//messageCh: make(chan Message), //messageCh: make(chan Message),
} }
@ -261,9 +252,6 @@ func (s *server) processSpawn(proc process) {
s.processes[proc.subject.name()] = proc s.processes[proc.subject.name()] = proc
s.mu.Unlock() s.mu.Unlock()
// Loop creating one new message every second to simulate getting new
// messages to deliver.
//
// TODO: I think it makes most sense that the messages would come to // TODO: I think it makes most sense that the messages would come to
// here from some other message-pickup-process, and that process will // here from some other message-pickup-process, and that process will
// give the message to the correct publisher process. A channel that // give the message to the correct publisher process. A channel that
@ -271,9 +259,7 @@ func (s *server) processSpawn(proc process) {
// messages from the message-pickup-process. // messages from the message-pickup-process.
for { for {
// Wait and read the next message on the message channel // Wait and read the next message on the message channel
fmt.Printf("* Before checking messageCh inside process\n")
m := <-proc.subject.messageCh m := <-proc.subject.messageCh
fmt.Printf("* After checking messageCh inside process: %v\n", m)
m.ID = s.processes[proc.subject.name()].messageID m.ID = s.processes[proc.subject.name()].messageID
messageDeliver(proc, m, s.natsConn) messageDeliver(proc, m, s.natsConn)
@ -284,9 +270,7 @@ func (s *server) processSpawn(proc process) {
// NB: simulate that we get an error, and that we can send that // NB: simulate that we get an error, and that we can send that
// out of the process and receive it in another thread. // out of the process and receive it in another thread.
s.processes[proc.subject.name()].errorCh <- "received an error from process: " + fmt.Sprintf("ID=%v, subjectName=%v\n", proc.processID, proc.subject.name()) s.errorCh <- "received an error from process: " + fmt.Sprintf("ID=%v, subjectName=%v\n", proc.processID, proc.subject.name())
//fmt.Printf("%#v\n", s.processes[proc.node])
} }
} }
@ -332,7 +316,7 @@ func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
// did not receive a reply, continuing from top again // did not receive a reply, continuing from top again
continue continue
} }
fmt.Printf("publisher: received: %s\n", msgReply.Data) log.Printf("publisher: received ACK: %s\n", msgReply.Data)
return return
} }
} }