1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-04-08 09:54:33 +00:00

errkrnl initial logic setup

This commit is contained in:
postmannen 2022-01-18 19:26:36 +01:00
parent f50250661d
commit 09a42d9442
2 changed files with 77 additions and 45 deletions

View file

@ -11,6 +11,7 @@ import (
"context"
"fmt"
"log"
"time"
)
// errorKernel is the structure that will hold all the error
@ -22,7 +23,7 @@ type errorKernel struct {
// message etc...
// errorCh is used to report errors from a process
errorCh chan errProcess
errorCh chan errorEvent
ctx context.Context
cancel context.CancelFunc
@ -33,7 +34,7 @@ func newErrorKernel(ctx context.Context) *errorKernel {
ctxC, cancel := context.WithCancel(ctx)
return &errorKernel{
errorCh: make(chan errProcess, 2),
errorCh: make(chan errorEvent, 2),
ctx: ctxC,
cancel: cancel,
}
@ -54,53 +55,72 @@ func (e *errorKernel) start(newMessagesCh chan<- []subjectAndMessage) error {
// console.
for {
var er errProcess
var errEvent errorEvent
select {
case er = <-e.errorCh:
case errEvent = <-e.errorCh:
case <-e.ctx.Done():
return fmt.Errorf("info: stopping errorKernel")
}
// Check the type of the error to decide what to do.
//
// We should be able to handle each error individually and
// also concurrently, so the handler is started in it's
// own go routine
go func() {
// NOTE: Here we should check the severity of the error,
// and also possibly the the error-state of the process
// that fails, so we can decide if we should stop and
// start a new process to replace to old one, or if we
// should just kill the process and send message back to
// the operator....or other ?
//
// Just print the error, and tell the process to continue
//
// Here we should check the severity of the error,
// and also possibly the the error-state of the process
// that fails, so we can decide if we should stop and
// start a new process to replace to old one, or if we
// should just kill the process and send message back to
// the operator....or other ?
switch errEvent.errorType {
// log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
case sendError:
// Just log the error, and don't use the errorAction channel
// so the process who sent the error don't have to wait for
// the error message to be sent before it can continue.
go func() {
// Add time stamp
er := fmt.Sprintf("%v, node: %v, %v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), errEvent.process.node, errEvent.err)
// // TESTING: Creating an error message to send to errorCentral
// fmt.Printf(" --- Sending error message to central !!!!!!!!!!!!!!!!!!!!!!!!!!!!\n")
// sam := subjectAndMessage{
// Subject: Subject{
// ToNode: "errorCentral",
// CommandOrEvent: EventNACK,
// Method: ErrorLog,
// },
// Message: Message{
// ToNode: "errorCentral",
// Data: []string{"some tull here .............."},
// CommandOrEvent: EventNACK,
// Method: ErrorLog,
// },
// }
// newMessagesCh <- []subjectAndMessage{sam}
sam := subjectAndMessage{
Subject: newSubject(REQErrorLog, "errorCentral"),
Message: Message{
Directory: "errorLog",
ToNode: "errorCentral",
FromNode: "apekatt",
FileName: "error.log",
Data: []string{er},
Method: REQErrorLog,
ACKTimeout: errEvent.process.configuration.ErrorMessageTimeout,
Retries: errEvent.process.configuration.ErrorMessageRetries,
},
}
select {
case er.errorActionCh <- errActionContinue:
case <-e.ctx.Done():
log.Printf("info: errorKernel: got ctx.Done, will stop waiting for errAction\n")
return
}
}()
newMessagesCh <- []subjectAndMessage{sam}
//metrics.promErrorMessagesSentTotal.Inc()
}()
default:
// Just print the error, and tell the process to continue. The
// process who sent the error should block andwait for receiving
// an errActionContinue message.
go func() {
// log.Printf("*** error_kernel: %#v, type=%T\n", er, er)
log.Printf("TESTING, we received and error from the process, but we're telling the process back to continue\n")
select {
case errEvent.errorActionCh <- errActionContinue:
case <-e.ctx.Done():
log.Printf("info: errorKernel: got ctx.Done, will stop waiting for errAction\n")
return
}
}()
}
}
}
@ -108,6 +128,9 @@ func (e *errorKernel) stop() {
e.cancel()
}
// errorAction is used to tell the process who sent the error
// what it shall do. The process who sends the error will
// have to block and wait for the response on the errorActionCh.
type errorAction int
const (
@ -124,18 +147,27 @@ const (
// should contain a copy of the original message.
)
type errProcess struct {
// errorType
type errorType int
const (
sendError errorType = iota
)
type errorEvent struct {
// The actual error
err error
// Channel for communicating the action to take back to
// to the process who triggered the error
errorActionCh chan errorAction
// Some informational text
infoText string
errorType errorType
// The process structure that belongs to a given process
process process
// The message that where in progress when error occured
message Message
}
func (e errProcess) Error() string {
func (e errorEvent) Error() string {
return fmt.Sprintf("worker error: proc = %#v, message = %#v", e.process, e.message)
}

View file

@ -46,7 +46,7 @@ type process struct {
// within a process we can send errors to the error kernel,
// the EK desided what to do, and sends the action about
// what to do back to the process where the error came from.
errorCh chan errProcess
errorCh chan errorEvent
processKind processKind
// Who are we allowed to receive from ?
// allowedReceivers map[Node]struct{}
@ -86,7 +86,7 @@ type process struct {
// prepareNewProcess will set the the provided values and the default
// values for a process.
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errProcess, processKind processKind, procFunc func() error) process {
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errorEvent, processKind processKind, procFunc func() error) process {
// create the initial configuration for a sessions communicating with 1 host process.
processes.lastProcessID++
@ -713,8 +713,8 @@ func (p process) publishMessages(natsConn *nats.Conn) {
if err != nil {
// Create an error type which also creates a channel which the
// errorKernel will send back the action about what to do.
ep := errProcess{
infoText: "process failed",
ep := errorEvent{
//errorType: logOnly,
process: p,
message: m,
errorActionCh: make(chan errorAction),