// The error kernel shall handle errors for a given process. // This will be cases where the process itself were unable // to handle the error on it's own, and we might need to // restart the process, or send a message back to the operator // that the action which the message where supposed to trigger // failed, or that an event where unable to be processed. package steward import ( "context" "fmt" "log" ) // errorKernel is the structure that will hold all the error // handling values and logic. type errorKernel struct { // NOTE: The errorKernel should probably have a concept // of error-state which is a map of all the processes, // how many times a process have failed over the same // message etc... // errorCh is used to report errors from a process errorCh chan errProcess ctx context.Context cancel context.CancelFunc } // newErrorKernel will initialize and return a new error kernel func newErrorKernel(ctx context.Context) *errorKernel { ctxC, cancel := context.WithCancel(ctx) return &errorKernel{ errorCh: make(chan errProcess, 2), ctx: ctxC, cancel: cancel, } } // startErrorKernel will start the error kernel and check if there // have been reveived any errors from any of the processes, and // handle them appropriately. // // NOTE: Since a process will be locked while waiting to send the error // on the errorCh maybe it makes sense to have a channel inside the // processes error handling with a select so we can send back to the // process if it should continue or not based not based on how severe // the error where. This should be right after sending the error // sending in the process. func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error { // NOTE: For now it will just print the error messages to the // console. for { var er errProcess select { case er = <-e.errorCh: case <-e.ctx.Done(): return fmt.Errorf("info: stopping errorKernel") } // We should be able to handle each error individually and // also concurrently, so the handler is started in it's // own go routine go func() { // NOTE: Here we should check the severity of the error, // and also possibly the the error-state of the process // that fails, so we can decide if we should stop and // start a new process to replace to old one, or if we // should just kill the process and send message back to // the operator....or other ? // // Just print the error, and tell the process to continue // log.Printf("*** error_kernel: %#v, type=%T\n", er, er) log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n") // // TESTING: Creating an error message to send to errorCentral // fmt.Printf(" --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") // sam := subjectAndMessage{ // Subject: Subject{ // ToNode: "errorCentral", // CommandOrEvent: EventNACK, // Method: ErrorLog, // }, // Message: Message{ // ToNode: "errorCentral", // Data: []string{"some tull here .............."}, // CommandOrEvent: EventNACK, // Method: ErrorLog, // }, // } // newMessagesCh <- []subjectAndMessage{sam} select { case er.errorActionCh <- errActionContinue: case <-e.ctx.Done(): log.Printf("info: errorKernel: got ctx.Done, will stop waiting for errAction\n") return } }() } } func (e *errorKernel) stop() { e.cancel() } type errorAction int const ( // errActionJustPrint should just print the error, // and the worker process should continue. errActionContinue errorAction = iota // errActionKillAndSpawnNew should log the error, // stop the current worker process, and spawn a new. errActionKill errorAction = iota // errActionKillAndDie should log the error, stop the // current worker process, and send a message back to // the master supervisor that it was unable to complete // the action of the current message. The error message // should contain a copy of the original message. ) type errProcess struct { // Channel for communicating the action to take back to // to the process who triggered the error errorActionCh chan errorAction // Some informational text infoText string // The process structure that belongs to a given process process process // The message that where in progress when error occured message Message } func (e errProcess) Error() string { return fmt.Sprintf("worker error: proc = %#v, message = %#v", e.process, e.message) }