1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

implemented error kernel process actions

This commit is contained in:
postmannen 2021-02-08 06:02:54 +01:00
parent f4d7f40c86
commit f01995cab5
3 changed files with 98 additions and 42 deletions

View file

@ -83,3 +83,5 @@ and for a shell command of type command to a host named "ship2"
- Check that there is a node for the specific message new incomming message, and the supervisor should create the process with the wanted subject on both the publishing and the receiving node. If there is no such node an error should be generated and processed by the error-kernel.
- Since a process will be locked while waiting to send the error on the errorCh maybe it makes sense to have a channel inside the processes error handling with a select so we can send back to the process if it should continue or not based not based on how severe the error where. This should be right after sending the error sending in the process.
- Look into adding a channel to the error messages sent from a worker process, so the error kernel can send f.ex. a shutdown instruction back to the worker.

75
errorkernel.go Normal file
View file

@ -0,0 +1,75 @@
package steward
import (
"fmt"
"log"
)
// errorKernel is the structure that will hold all the error
// handling values and logic.
type errorKernel struct {
// ringBuffer *ringBuffer
}
// newErrorKernel will initialize and return a new error kernel
func newErrorKernel() *errorKernel {
return &errorKernel{
// ringBuffer: newringBuffer(),
}
}
// startErrorKernel will start the error kernel and check if there
// have been reveived any errors from any of the processes, and
// handle them appropriately.
// TODO: Since a process will be locked while waiting to send the error
// on the errorCh maybe it makes sense to have a channel inside the
// processes error handling with a select so we can send back to the
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) startErrorKernel(errorCh chan errProcess) {
// TODO: For now it will just print the error messages to the
// console.
go func() {
for {
er := <-errorCh
// We should be able to handle each error individually and
// also concurrently, so the handler is start in it's own
// go routine
go func() {
// Just print the error, and tell the process to continue
log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
er.errorActionCh <- errActionContinue
}()
}
}()
}
type errorAction int
const (
// errActionJustPrint should just print the error,
// and the worker process should continue.
errActionContinue errorAction = iota
// errActionKillAndSpawnNew should log the error,
// stop the current worker process, and spawn a new.
errActionKill errorAction = iota
// errActionKillAndDie should log the error, stop the
// current worker process, and send a message back to
// the master supervisor that it was unable to complete
// the action of the current message. The error message
// should contain a copy of the original message.
)
type errProcess struct {
errorActionCh chan errorAction
infoText string
process process
message Message
}
func (e errProcess) Error() string {
return fmt.Sprintf("worker error: proc = %#v, message = %#v", e.process, e.message)
}

View file

@ -62,7 +62,7 @@ type server struct {
newMessagesCh chan []jsonFromFile
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan string
errorCh chan errProcess
// errorKernel
errorKernel *errorKernel
}
@ -79,7 +79,7 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
natsConn: conn,
processes: make(map[subjectName]process),
newMessagesCh: make(chan []jsonFromFile),
errorCh: make(chan string, 10),
errorCh: make(chan errProcess, 2),
}
// Start the error kernel that will do all the error handling
@ -100,7 +100,7 @@ func (s *server) PublisherStart() {
sub := newSubject("ship1", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub, s.errorCh)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
go s.processSpawnWorker(proc)
}
// Prepare and start a single process
@ -108,7 +108,7 @@ func (s *server) PublisherStart() {
sub := newSubject("ship2", "command", "shellcommand", "shell")
proc := s.processPrepareNew(sub, s.errorCh)
// fmt.Printf("*** %#v\n", proc)
go s.processSpawn(proc)
go s.processSpawnWorker(proc)
}
s.handleNewOperatorMessages()
@ -117,40 +117,6 @@ func (s *server) PublisherStart() {
}
// errorKernel is the structure that will hold all the error
// handling values and logic.
type errorKernel struct {
// ringBuffer *ringBuffer
}
// newErrorKernel will initialize and return a new error kernel
func newErrorKernel() *errorKernel {
return &errorKernel{
// ringBuffer: newringBuffer(),
}
}
// startErrorKernel will start the error kernel and check if there
// have been reveived any errors from any of the processes, and
// handle them appropriately.
// TODO: Since a process will be locked while waiting to send the error
// on the errorCh maybe it makes sense to have a channel inside the
// processes error handling with a select so we can send back to the
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) startErrorKernel(errorCh chan string) {
// TODO: For now it will just print the error messages to the
// console.
go func() {
for {
er := <-errorCh
log.Printf("*** ERROR_KERNEL: %#v, type=%T\n", er, er)
}
}()
}
// handleNewOperatorMessages will handle all the new operator messages
// given to the system, and route them to the correct subject queue.
func (s *server) handleNewOperatorMessages() {
@ -238,12 +204,12 @@ type process struct {
processID int
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan string
errorCh chan errProcess
}
// prepareNewProcess will set the the provided values and the default
// values for a process.
func (s *server) processPrepareNew(subject Subject, errCh chan string) process {
func (s *server) processPrepareNew(subject Subject, errCh chan errProcess) process {
// create the initial configuration for a sessions communicating with 1 host process.
s.lastProcessID++
proc := process{
@ -261,7 +227,7 @@ func (s *server) processPrepareNew(subject Subject, errCh chan string) process {
// spawnProcess will spawn a new process. It will give the process
// the next available ID, and also add the process to the processes
// map.
func (s *server) processSpawn(proc process) {
func (s *server) processSpawnWorker(proc process) {
s.mu.Lock()
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
@ -287,7 +253,20 @@ func (s *server) processSpawn(proc process) {
// NB: simulate that we get an error, and that we can send that
// out of the process and receive it in another thread.
s.errorCh <- "received an error from process: " + fmt.Sprintf("ID=%v, subjectName=%v\n", proc.processID, proc.subject.name())
ep := errProcess{
infoText: "process failed",
process: proc,
message: m,
errorActionCh: make(chan errorAction),
}
s.errorCh <- ep
// Wait for the response action back from the error kernel, and
// decide what to do. Should we continue, quit, or .... ?
switch <-ep.errorActionCh {
case errActionContinue:
log.Printf("The errAction was continue...so we're continuing\n")
}
}
}