diff --git a/errorkernel.go b/errorkernel.go index c640dc3..289b300 100644 --- a/errorkernel.go +++ b/errorkernel.go @@ -15,7 +15,7 @@ import ( // errorKernel is the structure that will hold all the error // handling values and logic. type errorKernel struct { - // TODO: The errorKernel should probably have a concept + // NOTE: The errorKernel should probably have a concept // of error-state which is a map of all the processes, // how many times a process have failed over the same // message etc... @@ -34,14 +34,15 @@ func newErrorKernel() *errorKernel { // startErrorKernel will start the error kernel and check if there // have been reveived any errors from any of the processes, and // handle them appropriately. -// TODO: Since a process will be locked while waiting to send the error +// +// NOTE: Since a process will be locked while waiting to send the error // on the errorCh maybe it makes sense to have a channel inside the // processes error handling with a select so we can send back to the // process if it should continue or not based not based on how severe // the error where. This should be right after sending the error // sending in the process. func (e *errorKernel) startErrorKernel(newMessagesCh chan<- []subjectAndMessage) { - // TODO: For now it will just print the error messages to the + // NOTE: For now it will just print the error messages to the // console. go func() { @@ -52,7 +53,7 @@ func (e *errorKernel) startErrorKernel(newMessagesCh chan<- []subjectAndMessage) // also concurrently, so the handler is started in it's // own go routine go func() { - // TODO: Here we should check the severity of the error, + // NOTE: Here we should check the severity of the error, // and also possibly the the error-state of the process // that fails, so we can decide if we should stop and // start a new process to replace to old one, or if we diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 200931f..e3a7762 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/message-and-subject.go b/message-and-subject.go index 692d41c..22848d0 100644 --- a/message-and-subject.go +++ b/message-and-subject.go @@ -36,7 +36,6 @@ type Message struct { // gobEncodePayload will encode the message structure along with its // valued in gob binary format. -// TODO: Check if it adds value to compress with gzip. func gobEncodeMessage(m Message) ([]byte, error) { var buf bytes.Buffer gobEnc := gob.NewEncoder(&buf) diff --git a/process.go b/process.go index 7b0421e..1bcd3b7 100644 --- a/process.go +++ b/process.go @@ -32,8 +32,11 @@ type process struct { node node // The processID for the current process processID int - // errorCh is used to report errors from a process - // NB: Implementing this as an int to report for testing + // errorCh is the same channel the errorKernel uses to + // read incomming errors. By having this channel available + // within a process we can send errors to the error kernel, + // the EK desided what to do, and sends the action about + // what to do back to the process where the error came from. errorCh chan errProcess processKind processKind // Who are we allowed to receive from ? @@ -130,7 +133,9 @@ func (p process) spawnWorker(s *server) { go func() { err := p.procFunc() if err != nil { - log.Printf("error: spawnWorker: procFunc failed: %v\n", err) + er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.newMessagesCh, node(p.node), err) } }() } @@ -149,7 +154,9 @@ func (p process) spawnWorker(s *server) { go func() { err := p.procFunc() if err != nil { - log.Printf("error: spawnWorker: procFunc failed: %v\n", err) + er := fmt.Errorf("error: spawnWorker: procFunc failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.newMessagesCh, node(p.node), err) } }() } @@ -168,7 +175,10 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { for { dataPayload, err := gobEncodeMessage(message) if err != nil { - log.Printf("error: createDataPayload: %v\n", err) + er := fmt.Errorf("error: createDataPayload: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.newMessagesCh, node(p.node), err) + continue } msg := &nats.Msg{ @@ -187,14 +197,18 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // Create a subscriber for the reply message. subReply, err := natsConn.SubscribeSync(msg.Reply) if err != nil { - log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err) + er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.newMessagesCh, node(p.node), err) continue } // Publish message err = natsConn.PublishMsg(msg) if err != nil { - log.Printf("error: publish failed: %v\n", err) + er := fmt.Errorf("error: publish failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(p.newMessagesCh, node(p.node), err) continue } @@ -251,24 +265,22 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na gobDec := gob.NewDecoder(buf) err := gobDec.Decode(&message) if err != nil { - log.Printf("error: gob decoding failed: %v\n", err) + er := fmt.Errorf("error: gob decoding failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) } - // TODO: Maybe the handling of the errors within the subscriber - // should also involve the error-kernel to report back centrally - // that there was a problem like missing method to handle a specific - // method etc. switch { case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: - // REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { - // TODO: Check how errors should be handled here!!! - log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) + er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) + log.Printf("%v\n", er) + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) } out := []byte("not allowed from " + message.FromNode) - var err error + //var err error // Check if we are allowed to receive from that host _, arOK1 := p.allowedReceivers[message.FromNode] @@ -282,8 +294,9 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na out, err = mh.handler(p, message, thisNode) if err != nil { - // TODO: Send to error kernel ? - log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) + er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) } } else { log.Printf("info: we don't allow receiving from: %v, %v\n", message.FromNode, p.subject) @@ -292,18 +305,13 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // Send a confirmation message back to the publisher natsConn.Publish(msg.Reply, out) - // TESTING: Simulate that we also want to send some error that occured - // to the errorCentral - { - err := fmt.Errorf("error: some testing error we want to send out from %v", p.node) - sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) - } case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: // REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { - // TODO: Check how errors should be handled here!!! - log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) + er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.CommandOrEvent) + log.Printf("%v\n", er) + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) } // Start the method handler for that specific subject type. @@ -317,11 +325,15 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na _, err := mf.handler(p, message, thisNode) if err != nil { - // TODO: Send to error kernel ? - log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) + er := fmt.Errorf("error: subscriberHandler: failed to execute event: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) } default: - log.Printf("info: did not find that specific type of command: %#v\n", p.subject.CommandOrEvent) + er := fmt.Errorf("info: did not find that specific type of command: %#v", p.subject.CommandOrEvent) + log.Printf("%v\n", er) + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) + } } @@ -346,6 +358,8 @@ func (p process) subscribeMessages(s *server) { // process. func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { for { + var err error + // Wait and read the next message on the message channel m := <-p.subject.messageCh @@ -365,25 +379,35 @@ func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { processes.mu.Lock() processes.active[pn] = p processes.mu.Unlock() - // REMOVED: sleep - //time.Sleep(time.Second * 1) - // NB: simulate that we get an error, and that we can send that - // out of the process and receive it in another thread. - // REMOVED: Error simulation - // ep := errProcess{ - // infoText: "process failed", - // process: p, - // message: m, - // errorActionCh: make(chan errorAction), - // } - // s.errorKernel.errorCh <- ep + // Handle the error. // - // // Wait for the response action back from the error kernel, and - // // decide what to do. Should we continue, quit, or .... ? - // switch <-ep.errorActionCh { - // case errActionContinue: - // log.Printf("The errAction was continue...so we're continuing\n") - // } + // NOTE: None of the processes above generate an error, so the the + // if clause will never be triggered. But keeping it here as an example + // for now for how to handle errors. + if err != nil { + // Create an error type which also creates a channel which the + // errorKernel will send back the action about what to do. + ep := errProcess{ + infoText: "process failed", + process: p, + message: m, + errorActionCh: make(chan errorAction), + } + p.errorCh <- ep + + // Wait for the response action back from the error kernel, and + // decide what to do. Should we continue, quit, or .... ? + switch <-ep.errorActionCh { + case errActionContinue: + // Just log and continue + log.Printf("The errAction was continue...so we're continuing\n") + case errActionKill: + log.Printf("The errAction was kill...so we're killing\n") + // .... + default: + log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n") + } + } } } diff --git a/ringbuffer.go b/ringbuffer.go index 5ba87b2..b6b1105 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -37,18 +37,22 @@ type ringBuffer struct { totalMessagesIndex int mu sync.Mutex permStore chan string + nodeName node + newMessagesCh chan []subjectAndMessage } // newringBuffer is a push/pop storage for values. -func newringBuffer(size int, dbFileName string) *ringBuffer { +func newringBuffer(size int, dbFileName string, nodeName node, newMessagesCh chan []subjectAndMessage) *ringBuffer { db, err := bolt.Open(dbFileName, 0600, nil) if err != nil { log.Printf("error: failed to open db: %v\n", err) } return &ringBuffer{ - bufData: make(chan samDBValue, size), - db: db, - permStore: make(chan string), + bufData: make(chan samDBValue, size), + db: db, + permStore: make(chan string), + nodeName: nodeName, + newMessagesCh: newMessagesCh, } } @@ -56,12 +60,10 @@ func newringBuffer(size int, dbFileName string) *ringBuffer { // put the messages on a buffered channel // and deliver messages out when requested on the outCh. func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue, defaultMessageTimeout int, defaultMessageRetries int) { + // Starting both writing and reading in separate go routines so we // can write and read concurrently. - // TODO: At startup, check if there are unprocessed messages in - // the K/V store, and process them. - const samValueBucket string = "samValueBucket" const indexValueBucket string = "indexValueBucket" @@ -149,8 +151,10 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri // Store the incomming message in key/value store err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js) if err != nil { - // TODO: Handle error - log.Printf("error: dbUpdate samValue failed: %v\n", err) + er := fmt.Errorf("error: dbUpdate samValue failed: %v", err) + log.Printf("%v\n", er) + sendErrorLogMessage(r.newMessagesCh, node(r.nodeName), err) + } // Put the message on the inmemory buffer. diff --git a/server.go b/server.go index 03487f5..8e3ef6c 100644 --- a/server.go +++ b/server.go @@ -204,7 +204,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage { func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) { // Prepare and start a new ring buffer const bufferSize int = 1000 - rb := newringBuffer(bufferSize, dbFileName) + rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.newMessagesCh) inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValue) // start the ringbuffer.