1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

sending errors to central

This commit is contained in:
postmannen 2021-03-12 09:38:19 +01:00
parent 6400645283
commit ca295d97d8
6 changed files with 90 additions and 62 deletions

View file

@ -15,7 +15,7 @@ import (
// errorKernel is the structure that will hold all the error
// handling values and logic.
type errorKernel struct {
// TODO: The errorKernel should probably have a concept
// NOTE: The errorKernel should probably have a concept
// of error-state which is a map of all the processes,
// how many times a process have failed over the same
// message etc...
@ -34,14 +34,15 @@ func newErrorKernel() *errorKernel {
// startErrorKernel will start the error kernel and check if there
// have been reveived any errors from any of the processes, and
// handle them appropriately.
// TODO: Since a process will be locked while waiting to send the error
//
// NOTE: Since a process will be locked while waiting to send the error
// on the errorCh maybe it makes sense to have a channel inside the
// processes error handling with a select so we can send back to the
// process if it should continue or not based not based on how severe
// the error where. This should be right after sending the error
// sending in the process.
func (e *errorKernel) startErrorKernel(newMessagesCh chan<- []subjectAndMessage) {
// TODO: For now it will just print the error messages to the
// NOTE: For now it will just print the error messages to the
// console.
go func() {
@ -52,7 +53,7 @@ func (e *errorKernel) startErrorKernel(newMessagesCh chan<- []subjectAndMessage)
// also concurrently, so the handler is started in it's
// own go routine
go func() {
// TODO: Here we should check the severity of the error,
// NOTE: Here we should check the severity of the error,
// and also possibly the the error-state of the process
// that fails, so we can decide if we should stop and
// start a new process to replace to old one, or if we

Binary file not shown.

View file

@ -36,7 +36,6 @@ type Message struct {
// gobEncodePayload will encode the message structure along with its
// valued in gob binary format.
// TODO: Check if it adds value to compress with gzip.
func gobEncodeMessage(m Message) ([]byte, error) {
var buf bytes.Buffer
gobEnc := gob.NewEncoder(&buf)

View file

@ -32,8 +32,11 @@ type process struct {
node node
// The processID for the current process
processID int
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
// errorCh is the same channel the errorKernel uses to
// read incomming errors. By having this channel available
// within a process we can send errors to the error kernel,
// the EK desided what to do, and sends the action about
// what to do back to the process where the error came from.
errorCh chan errProcess
processKind processKind
// Who are we allowed to receive from ?
@ -130,7 +133,9 @@ func (p process) spawnWorker(s *server) {
go func() {
err := p.procFunc()
if err != nil {
log.Printf("error: spawnWorker: procFunc failed: %v\n", err)
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
}
}()
}
@ -149,7 +154,9 @@ func (p process) spawnWorker(s *server) {
go func() {
err := p.procFunc()
if err != nil {
log.Printf("error: spawnWorker: procFunc failed: %v\n", err)
er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
}
}()
}
@ -168,7 +175,10 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
for {
dataPayload, err := gobEncodeMessage(message)
if err != nil {
log.Printf("error: createDataPayload: %v\n", err)
er := fmt.Errorf("error: createDataPayload: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
continue
}
msg := &nats.Msg{
@ -187,14 +197,18 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// Create a subscriber for the reply message.
subReply, err := natsConn.SubscribeSync(msg.Reply)
if err != nil {
log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err)
er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
continue
}
// Publish message
err = natsConn.PublishMsg(msg)
if err != nil {
log.Printf("error: publish failed: %v\n", err)
er := fmt.Errorf("error: publish failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(p.newMessagesCh, node(p.node), err)
continue
}
@ -251,24 +265,22 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
gobDec := gob.NewDecoder(buf)
err := gobDec.Decode(&message)
if err != nil {
log.Printf("error: gob decoding failed: %v\n", err)
er := fmt.Errorf("error: gob decoding failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
// TODO: Maybe the handling of the errors within the subscriber
// should also involve the error-kernel to report back centrally
// that there was a problem like missing method to handle a specific
// method etc.
switch {
case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK:
// REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok {
// TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent)
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
out := []byte("not allowed from " + message.FromNode)
var err error
//var err error
// Check if we are allowed to receive from that host
_, arOK1 := p.allowedReceivers[message.FromNode]
@ -282,8 +294,9 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
out, err = mh.handler(p, message, thisNode)
if err != nil {
// TODO: Send to error kernel ?
log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
} else {
log.Printf("info: we don't allow receiving from: %v, %v\n", message.FromNode, p.subject)
@ -292,18 +305,13 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
// Send a confirmation message back to the publisher
natsConn.Publish(msg.Reply, out)
// TESTING: Simulate that we also want to send some error that occured
// to the errorCentral
{
err := fmt.Errorf("error: some testing error we want to send out from %v", p.node)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK:
// REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name())
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
if !ok {
// TODO: Check how errors should be handled here!!!
log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent)
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
// Start the method handler for that specific subject type.
@ -317,11 +325,15 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na
_, err := mf.handler(p, message, thisNode)
if err != nil {
// TODO: Send to error kernel ?
log.Printf("error: subscriberHandler: failed to execute event: %v\n", err)
er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
default:
log.Printf("info: did not find that specific type of command: %#v\n", p.subject.CommandOrEvent)
er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent)
log.Printf("%v\n", er)
sendErrorLogMessage(s.newMessagesCh, node(thisNode), err)
}
}
@ -346,6 +358,8 @@ func (p process) subscribeMessages(s *server) {
// process.
func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
for {
var err error
// Wait and read the next message on the message channel
m := <-p.subject.messageCh
@ -365,25 +379,35 @@ func (p process) publishMessages(natsConn *nats.Conn, processes *processes) {
processes.mu.Lock()
processes.active[pn] = p
processes.mu.Unlock()
// REMOVED: sleep
//time.Sleep(time.Second * 1)
// NB: simulate that we get an error, and that we can send that
// out of the process and receive it in another thread.
// REMOVED: Error simulation
// ep := errProcess{
// infoText: "process failed",
// process: p,
// message: m,
// errorActionCh: make(chan errorAction),
// }
// s.errorKernel.errorCh <- ep
// Handle the error.
//
// // Wait for the response action back from the error kernel, and
// // decide what to do. Should we continue, quit, or .... ?
// switch <-ep.errorActionCh {
// case errActionContinue:
// log.Printf("The errAction was continue...so we're continuing\n")
// }
// NOTE: None of the processes above generate an error, so the the
// if clause will never be triggered. But keeping it here as an example
// for now for how to handle errors.
if err != nil {
// Create an error type which also creates a channel which the
// errorKernel will send back the action about what to do.
ep := errProcess{
infoText: "process failed",
process: p,
message: m,
errorActionCh: make(chan errorAction),
}
p.errorCh <- ep
// Wait for the response action back from the error kernel, and
// decide what to do. Should we continue, quit, or .... ?
switch <-ep.errorActionCh {
case errActionContinue:
// Just log and continue
log.Printf("The errAction was continue...so we're continuing\n")
case errActionKill:
log.Printf("The errAction was kill...so we're killing\n")
// ....
default:
log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n")
}
}
}
}

View file

@ -37,18 +37,22 @@ type ringBuffer struct {
totalMessagesIndex int
mu sync.Mutex
permStore chan string
nodeName node
newMessagesCh chan []subjectAndMessage
}
// newringBuffer is a push/pop storage for values.
func newringBuffer(size int, dbFileName string) *ringBuffer {
func newringBuffer(size int, dbFileName string, nodeName node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
db, err := bolt.Open(dbFileName, 0600, nil)
if err != nil {
log.Printf("error: failed to open db: %v\n", err)
}
return &ringBuffer{
bufData: make(chan samDBValue, size),
db: db,
permStore: make(chan string),
bufData: make(chan samDBValue, size),
db: db,
permStore: make(chan string),
nodeName: nodeName,
newMessagesCh: newMessagesCh,
}
}
@ -56,12 +60,10 @@ func newringBuffer(size int, dbFileName string) *ringBuffer {
// put the messages on a buffered channel
// and deliver messages out when requested on the outCh.
func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue, defaultMessageTimeout int, defaultMessageRetries int) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
// TODO: At startup, check if there are unprocessed messages in
// the K/V store, and process them.
const samValueBucket string = "samValueBucket"
const indexValueBucket string = "indexValueBucket"
@ -149,8 +151,10 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
// Store the incomming message in key/value store
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
if err != nil {
// TODO: Handle error
log.Printf("error: dbUpdate samValue failed: %v\n", err)
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
log.Printf("%v\n", er)
sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), err)
}
// Put the message on the inmemory buffer.

View file

@ -204,7 +204,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
// Prepare and start a new ring buffer
const bufferSize int = 1000
rb := newringBuffer(bufferSize, dbFileName)
rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.newMessagesCh)
inCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValue)
// start the ringbuffer.