mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
141 lines
4.5 KiB
Go
141 lines
4.5 KiB
Go
// The error kernel shall handle errors for a given process.
|
|
// This will be cases where the process itself were unable
|
|
// to handle the error on it's own, and we might need to
|
|
// restart the process, or send a message back to the operator
|
|
// that the action which the message where supposed to trigger
|
|
// failed, or that an event where unable to be processed.
|
|
|
|
package steward
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
)
|
|
|
|
// errorKernel is the structure that will hold all the error
|
|
// handling values and logic.
|
|
type errorKernel struct {
|
|
// NOTE: The errorKernel should probably have a concept
|
|
// of error-state which is a map of all the processes,
|
|
// how many times a process have failed over the same
|
|
// message etc...
|
|
|
|
// errorCh is used to report errors from a process
|
|
errorCh chan errProcess
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// newErrorKernel will initialize and return a new error kernel
|
|
func newErrorKernel(ctx context.Context) *errorKernel {
|
|
ctxC, cancel := context.WithCancel(ctx)
|
|
|
|
return &errorKernel{
|
|
errorCh: make(chan errProcess, 2),
|
|
ctx: ctxC,
|
|
cancel: cancel,
|
|
}
|
|
}
|
|
|
|
// startErrorKernel will start the error kernel and check if there
|
|
// have been reveived any errors from any of the processes, and
|
|
// handle them appropriately.
|
|
//
|
|
// NOTE: Since a process will be locked while waiting to send the error
|
|
// on the errorCh maybe it makes sense to have a channel inside the
|
|
// processes error handling with a select so we can send back to the
|
|
// process if it should continue or not based not based on how severe
|
|
// the error where. This should be right after sending the error
|
|
// sending in the process.
|
|
func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
|
|
// NOTE: For now it will just print the error messages to the
|
|
// console.
|
|
|
|
for {
|
|
var er errProcess
|
|
select {
|
|
case er = <-e.errorCh:
|
|
case <-e.ctx.Done():
|
|
return fmt.Errorf("info: stopping errorKernel")
|
|
}
|
|
|
|
// We should be able to handle each error individually and
|
|
// also concurrently, so the handler is started in it's
|
|
// own go routine
|
|
go func() {
|
|
// NOTE: Here we should check the severity of the error,
|
|
// and also possibly the the error-state of the process
|
|
// that fails, so we can decide if we should stop and
|
|
// start a new process to replace to old one, or if we
|
|
// should just kill the process and send message back to
|
|
// the operator....or other ?
|
|
//
|
|
// Just print the error, and tell the process to continue
|
|
|
|
// log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
|
|
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
|
|
|
|
// // TESTING: Creating an error message to send to errorCentral
|
|
// fmt.Printf(" --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
|
|
// sam := subjectAndMessage{
|
|
// Subject: Subject{
|
|
// ToNode: "errorCentral",
|
|
// CommandOrEvent: EventNACK,
|
|
// Method: ErrorLog,
|
|
// },
|
|
// Message: Message{
|
|
// ToNode: "errorCentral",
|
|
// Data: []string{"some tull here .............."},
|
|
// CommandOrEvent: EventNACK,
|
|
// Method: ErrorLog,
|
|
// },
|
|
// }
|
|
// newMessagesCh <- []subjectAndMessage{sam}
|
|
|
|
select {
|
|
case er.errorActionCh <- errActionContinue:
|
|
case <-e.ctx.Done():
|
|
log.Printf("info: errorKernel: got ctx.Done, will stop waiting for errAction\n")
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (e *errorKernel) stop() {
|
|
e.cancel()
|
|
}
|
|
|
|
type errorAction int
|
|
|
|
const (
|
|
// errActionJustPrint should just print the error,
|
|
// and the worker process should continue.
|
|
errActionContinue errorAction = iota
|
|
// errActionKillAndSpawnNew should log the error,
|
|
// stop the current worker process, and spawn a new.
|
|
errActionKill errorAction = iota
|
|
// errActionKillAndDie should log the error, stop the
|
|
// current worker process, and send a message back to
|
|
// the master supervisor that it was unable to complete
|
|
// the action of the current message. The error message
|
|
// should contain a copy of the original message.
|
|
)
|
|
|
|
type errProcess struct {
|
|
// Channel for communicating the action to take back to
|
|
// to the process who triggered the error
|
|
errorActionCh chan errorAction
|
|
// Some informational text
|
|
infoText string
|
|
// The process structure that belongs to a given process
|
|
process process
|
|
// The message that where in progress when error occured
|
|
message Message
|
|
}
|
|
|
|
func (e errProcess) Error() string {
|
|
return fmt.Sprintf("worker error: proc = %#v, message = %#v", e.process, e.message)
|
|
}
|