diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 25f7e9c..af5436d 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/process.go b/process.go new file mode 100644 index 0000000..782a394 --- /dev/null +++ b/process.go @@ -0,0 +1,316 @@ +package steward + +import ( + "bytes" + "encoding/gob" + "fmt" + "log" + "time" + + "github.com/nats-io/nats.go" +) + +// processKind are either kindSubscriber or kindPublisher, and are +// used to distinguish the kind of process to spawn and to know +// the process kind put in the process map. +type processKind string + +const ( + processKindSubscriber processKind = "subscriber" + processKindPublisher processKind = "publisher" +) + +// process are represent the communication to one individual host +type process struct { + messageID int + // the subject used for the specific process. One process + // can contain only one sender on a message bus, hence + // also one subject + subject Subject + // Put a node here to be able know the node a process is at. + // NB: Might not be needed later on. + node node + // The processID for the current process + processID int + // errorCh is used to report errors from a process + // NB: Implementing this as an int to report for testing + errorCh chan errProcess + processKind processKind + // Who are we allowed to receive from ? + allowedReceivers map[node]struct{} +} + +// prepareNewProcess will set the the provided values and the default +// values for a process. +func newProcess(s *server, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process { + // create the initial configuration for a sessions communicating with 1 host process. + s.lastProcessID++ + + // make the slice of allowedReceivers into a map value for easy lookup. + m := make(map[node]struct{}) + for _, a := range allowedReceivers { + m[a] = struct{}{} + } + + proc := process{ + messageID: 0, + subject: subject, + node: node(subject.ToNode), + processID: s.lastProcessID, + errorCh: errCh, + processKind: processKind, + allowedReceivers: m, + } + + return proc +} + +// The purpose of this function is to check if we should start a +// publisher or subscriber process, where a process is a go routine +// that will handle either sending or receiving messages on one +// subject. +// +// It will give the process the next available ID, and also add the +// process to the processes map in the server structure. +func (p process) spawnWorker(s *server) { + // We use the full name of the subject to identify a unique + // process. We can do that since a process can only handle + // one message queue. + var pn processName + if p.processKind == processKindPublisher { + pn = processNameGet(p.subject.name(), processKindPublisher) + } + if p.processKind == processKindSubscriber { + pn = processNameGet(p.subject.name(), processKindSubscriber) + } + + // Add information about the new process to the started processes map. + s.mu.Lock() + s.processes[pn] = p + s.mu.Unlock() + + // Start a publisher worker, which will start a go routine (process) + // That will take care of all the messages for the subject it owns. + if p.processKind == processKindPublisher { + p.publishMessages(s) + } + + // Start a subscriber worker, which will start a go routine (process) + // That will take care of all the messages for the subject it owns. + if p.processKind == processKindSubscriber { + p.subscribeMessages(s) + } +} + +// messageDeliverNats will take care of the delivering the message +// as converted to gob format as a nats.Message. It will also take +// care of checking timeouts and retries specified for the message. +func (s *server) messageDeliverNats(proc process, message Message) { + retryAttempts := 0 + + for { + dataPayload, err := gobEncodeMessage(message) + if err != nil { + log.Printf("error: createDataPayload: %v\n", err) + } + + msg := &nats.Msg{ + Subject: string(proc.subject.name()), + // Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"), + // Structure of the reply message are: + // reply... + Reply: fmt.Sprintf("reply.%s", proc.subject.name()), + Data: dataPayload, + } + + // The SubscribeSync used in the subscriber, will get messages that + // are sent after it started subscribing, so we start a publisher + // that sends out a message every second. + // + // Create a subscriber for the reply message. + subReply, err := s.natsConn.SubscribeSync(msg.Reply) + if err != nil { + log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err) + continue + } + + // Publish message + err = s.natsConn.PublishMsg(msg) + if err != nil { + log.Printf("error: publish failed: %v\n", err) + continue + } + + // If the message is an ACK type of message we must check that a + // reply, and if it is not we don't wait here at all. + fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message) + if proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK { + // Wait up until timeout specified for a reply, + // continue and resend if noo reply received, + // or exit if max retries for the message reached. + msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) + if err != nil { + log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err) + + // did not receive a reply, decide what to do.. + retryAttempts++ + fmt.Printf("Retry attempts:%v, retries: %v, timeout: %v\n", retryAttempts, message.Retries, message.Timeout) + switch { + case message.Retries == 0: + // 0 indicates unlimited retries + continue + case retryAttempts >= message.Retries: + // max retries reached + log.Printf("info: max retries for message reached, breaking out: %v", retryAttempts) + return + default: + // none of the above matched, so we've not reached max retries yet + continue + } + } + log.Printf("<--- publisher: received ACK for message: %s\n", msgReply.Data) + } + return + } +} + +// subscriberHandler will deserialize the message when a new message is +// received, check the MessageType field in the message to decide what +// kind of message it is and then it will check how to handle that message type, +// and then call the correct method handler for it. +// +// This handler function should be started in it's own go routine,so +// one individual handler is started per message received so we can keep +// the state of the message being processed, and then reply back to the +// correct sending process's reply, meaning so we ACK back to the correct +// publisher. +func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, s *server) { + + message := Message{} + + // Create a buffer to decode the gob encoded binary data back + // to it's original structure. + buf := bytes.NewBuffer(msg.Data) + gobDec := gob.NewDecoder(buf) + err := gobDec.Decode(&message) + if err != nil { + log.Printf("error: gob decoding failed: %v\n", err) + } + + // TODO: Maybe the handling of the errors within the subscriber + // should also involve the error-kernel to report back centrally + // that there was a problem like missing method to handle a specific + // method etc. + switch { + case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: + log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) + mf, ok := s.methodsAvailable.CheckIfExists(message.Method) + if !ok { + // TODO: Check how errors should be handled here!!! + log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) + } + + out := []byte("not allowed from " + message.FromNode) + var err error + + // Check if we are allowed to receive from that host + _, arOK1 := p.allowedReceivers[message.FromNode] + _, arOK2 := p.allowedReceivers["*"] + + if arOK1 || arOK2 { + // Start the method handler for that specific subject type. + // The handler started here is what actually doing the action + // that executed a CLI command, or writes to a log file on + // the node who received the message. + out, err = mf.handler(s, p, message, thisNode) + + if err != nil { + // TODO: Send to error kernel ? + log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) + } + } else { + log.Printf("info: we don't allow receiving from: %v, %v\n", message.FromNode, p.subject) + } + + // Send a confirmation message back to the publisher + natsConn.Publish(msg.Reply, out) + + // TESTING: Simulate that we also want to send some error that occured + // to the errorCentral + { + err := fmt.Errorf("error: some testing error we want to send out") + sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) + } + case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: + log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) + mf, ok := s.methodsAvailable.CheckIfExists(message.Method) + if !ok { + // TODO: Check how errors should be handled here!!! + log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) + } + + // Start the method handler for that specific subject type. + // The handler started here is what actually doing the action + // that executed a CLI command, or writes to a log file on + // the node who received the message. + // + // since we don't send a reply for a NACK message, we don't care about the + // out return when calling mf.handler + _, err := mf.handler(s, p, message, thisNode) + + if err != nil { + // TODO: Send to error kernel ? + log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) + } + default: + log.Printf("info: did not find that specific type of command: %#v\n", p.subject.CommandOrEvent) + } +} + +// Subscribe will start up a Go routine under the hood calling the +// callback function specified when a new message is received. +func (p process) subscribeMessages(s *server) { + subject := string(p.subject.name()) + _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // We start one handler per message received by using go routines here. + // This is for being able to reply back the current publisher who sent + // the message. + go p.subscriberHandler(s.natsConn, s.nodeName, msg, s) + }) + if err != nil { + log.Printf("error: Subscribe failed: %v\n", err) + } +} + +func (p process) publishMessages(s *server) { + for { + // Wait and read the next message on the message channel + m := <-p.subject.messageCh + pn := processNameGet(p.subject.name(), processKindPublisher) + m.ID = s.processes[pn].messageID + s.messageDeliverNats(p, m) + m.done <- struct{}{} + + // Increment the counter for the next message to be sent. + p.messageID++ + s.processes[pn] = p + time.Sleep(time.Second * 1) + + // NB: simulate that we get an error, and that we can send that + // out of the process and receive it in another thread. + ep := errProcess{ + infoText: "process failed", + process: p, + message: m, + errorActionCh: make(chan errorAction), + } + s.errorKernel.errorCh <- ep + + // Wait for the response action back from the error kernel, and + // decide what to do. Should we continue, quit, or .... ? + switch <-ep.errorActionCh { + case errActionContinue: + log.Printf("The errAction was continue...so we're continuing\n") + } + } +} diff --git a/publisher.go b/publisher.go index a4b3c13..d08bd60 100644 --- a/publisher.go +++ b/publisher.go @@ -80,9 +80,9 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) - proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindPublisher, nil) + proc := newProcess(s, sub, s.errorKernel.errorCh, processKindPublisher, nil) // fmt.Printf("*** %#v\n", proc) - go s.spawnWorkerProcess(proc) + go proc.spawnWorker(s) time.Sleep(time.Millisecond * 500) s.printProcessesMap() @@ -93,36 +93,3 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM } }() } - -func (s *server) publishMessages(proc process) { - for { - // Wait and read the next message on the message channel - m := <-proc.subject.messageCh - pn := processNameGet(proc.subject.name(), processKindPublisher) - m.ID = s.processes[pn].messageID - s.messageDeliverNats(proc, m) - m.done <- struct{}{} - - // Increment the counter for the next message to be sent. - proc.messageID++ - s.processes[pn] = proc - time.Sleep(time.Second * 1) - - // NB: simulate that we get an error, and that we can send that - // out of the process and receive it in another thread. - ep := errProcess{ - infoText: "process failed", - process: proc, - message: m, - errorActionCh: make(chan errorAction), - } - s.errorKernel.errorCh <- ep - - // Wait for the response action back from the error kernel, and - // decide what to do. Should we continue, quit, or .... ? - switch <-ep.errorActionCh { - case errActionContinue: - log.Printf("The errAction was continue...so we're continuing\n") - } - } -} diff --git a/server.go b/server.go index 943236e..41ca4cf 100644 --- a/server.go +++ b/server.go @@ -2,8 +2,6 @@ package steward import ( - "bytes" - "encoding/gob" "fmt" "log" "os" @@ -168,263 +166,6 @@ func (s *server) printProcessesMap() { fmt.Println("--------------------------------------------------------------------------------------------") } -// processKind are either kindSubscriber or kindPublisher, and are -// used to distinguish the kind of process to spawn and to know -// the process kind put in the process map. -type processKind string - -const ( - processKindSubscriber processKind = "subscriber" - processKindPublisher processKind = "publisher" -) - -// process are represent the communication to one individual host -type process struct { - messageID int - // the subject used for the specific process. One process - // can contain only one sender on a message bus, hence - // also one subject - subject Subject - // Put a node here to be able know the node a process is at. - // NB: Might not be needed later on. - node node - // The processID for the current process - processID int - // errorCh is used to report errors from a process - // NB: Implementing this as an int to report for testing - errorCh chan errProcess - processKind processKind - // Who are we allowed to receive from ? - allowedReceivers map[node]struct{} -} - -// prepareNewProcess will set the the provided values and the default -// values for a process. -func (s *server) processPrepareNew(subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process { - // create the initial configuration for a sessions communicating with 1 host process. - s.lastProcessID++ - - // make the slice of allowedReceivers into a map value for easy lookup. - m := make(map[node]struct{}) - for _, a := range allowedReceivers { - m[a] = struct{}{} - } - - proc := process{ - messageID: 0, - subject: subject, - node: node(subject.ToNode), - processID: s.lastProcessID, - errorCh: errCh, - processKind: processKind, - allowedReceivers: m, - } - - return proc -} - -// The purpose of this function is to check if we should start a -// publisher or subscriber process, where a process is a go routine -// that will handle either sending or receiving messages on one -// subject. -// -// It will give the process the next available ID, and also add the -// process to the processes map in the server structure. -func (s *server) spawnWorkerProcess(proc process) { - s.mu.Lock() - // We use the full name of the subject to identify a unique - // process. We can do that since a process can only handle - // one message queue. - var pn processName - if proc.processKind == processKindPublisher { - pn = processNameGet(proc.subject.name(), processKindPublisher) - } - if proc.processKind == processKindSubscriber { - pn = processNameGet(proc.subject.name(), processKindSubscriber) - } - - // Add information about the new process to the started processes map. - s.processes[pn] = proc - s.mu.Unlock() - - // Start a publisher worker, which will start a go routine (process) - // That will take care of all the messages for the subject it owns. - if proc.processKind == processKindPublisher { - s.publishMessages(proc) - } - - // Start a subscriber worker, which will start a go routine (process) - // That will take care of all the messages for the subject it owns. - if proc.processKind == processKindSubscriber { - s.subscribeMessages(proc) - } -} - -// messageDeliverNats will take care of the delivering the message -// as converted to gob format as a nats.Message. It will also take -// care of checking timeouts and retries specified for the message. -func (s *server) messageDeliverNats(proc process, message Message) { - retryAttempts := 0 - - for { - dataPayload, err := gobEncodeMessage(message) - if err != nil { - log.Printf("error: createDataPayload: %v\n", err) - } - - msg := &nats.Msg{ - Subject: string(proc.subject.name()), - // Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"), - // Structure of the reply message are: - // reply... - Reply: fmt.Sprintf("reply.%s", proc.subject.name()), - Data: dataPayload, - } - - // The SubscribeSync used in the subscriber, will get messages that - // are sent after it started subscribing, so we start a publisher - // that sends out a message every second. - // - // Create a subscriber for the reply message. - subReply, err := s.natsConn.SubscribeSync(msg.Reply) - if err != nil { - log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err) - continue - } - - // Publish message - err = s.natsConn.PublishMsg(msg) - if err != nil { - log.Printf("error: publish failed: %v\n", err) - continue - } - - // If the message is an ACK type of message we must check that a - // reply, and if it is not we don't wait here at all. - fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message) - if proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK { - // Wait up until timeout specified for a reply, - // continue and resend if noo reply received, - // or exit if max retries for the message reached. - msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) - if err != nil { - log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err) - - // did not receive a reply, decide what to do.. - retryAttempts++ - fmt.Printf("Retry attempts:%v, retries: %v, timeout: %v\n", retryAttempts, message.Retries, message.Timeout) - switch { - case message.Retries == 0: - // 0 indicates unlimited retries - continue - case retryAttempts >= message.Retries: - // max retries reached - log.Printf("info: max retries for message reached, breaking out: %v", retryAttempts) - return - default: - // none of the above matched, so we've not reached max retries yet - continue - } - } - log.Printf("<--- publisher: received ACK for message: %s\n", msgReply.Data) - } - return - } -} - -// subscriberHandler will deserialize the message when a new message is -// received, check the MessageType field in the message to decide what -// kind of message it is and then it will check how to handle that message type, -// and then call the correct method handler for it. -// -// This handler function should be started in it's own go routine,so -// one individual handler is started per message received so we can keep -// the state of the message being processed, and then reply back to the -// correct sending process's reply, meaning so we ACK back to the correct -// publisher. -func (s *server) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, proc process) { - - message := Message{} - - // Create a buffer to decode the gob encoded binary data back - // to it's original structure. - buf := bytes.NewBuffer(msg.Data) - gobDec := gob.NewDecoder(buf) - err := gobDec.Decode(&message) - if err != nil { - log.Printf("error: gob decoding failed: %v\n", err) - } - - // TODO: Maybe the handling of the errors within the subscriber - // should also involve the error-kernel to report back centrally - // that there was a problem like missing method to handle a specific - // method etc. - switch { - case proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK: - log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", proc.subject.name()) - mf, ok := s.methodsAvailable.CheckIfExists(message.Method) - if !ok { - // TODO: Check how errors should be handled here!!! - log.Printf("error: subscriberHandler: method type not available: %v\n", proc.subject.CommandOrEvent) - } - - out := []byte("not allowed from " + message.FromNode) - var err error - - // Check if we are allowed to receive from that host - _, arOK1 := proc.allowedReceivers[message.FromNode] - _, arOK2 := proc.allowedReceivers["*"] - - if arOK1 || arOK2 { - // Start the method handler for that specific subject type. - // The handler started here is what actually doing the action - // that executed a CLI command, or writes to a log file on - // the node who received the message. - out, err = mf.handler(s, proc, message, thisNode) - - if err != nil { - // TODO: Send to error kernel ? - log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) - } - } else { - log.Printf("info: we don't allow receiving from: %v, %v\n", message.FromNode, proc.subject) - } - - // Send a confirmation message back to the publisher - natsConn.Publish(msg.Reply, out) - - // TESTING: Simulate that we also want to send some error that occured - // to the errorCentral - { - err := fmt.Errorf("error: some testing error we want to send out") - sendErrorLogMessage(s.newMessagesCh, node(thisNode), err) - } - case proc.subject.CommandOrEvent == CommandNACK || proc.subject.CommandOrEvent == EventNACK: - log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", proc.subject.name()) - mf, ok := s.methodsAvailable.CheckIfExists(message.Method) - if !ok { - // TODO: Check how errors should be handled here!!! - log.Printf("error: subscriberHandler: method type not available: %v\n", proc.subject.CommandOrEvent) - } - - // Start the method handler for that specific subject type. - // The handler started here is what actually doing the action - // that executed a CLI command, or writes to a log file on - // the node who received the message. - // - // since we don't send a reply for a NACK message, we don't care about the - // out return when calling mf.handler - _, err := mf.handler(s, proc, message, thisNode) - - if err != nil { - // TODO: Send to error kernel ? - log.Printf("error: subscriberHandler: failed to execute event: %v\n", err) - } - default: - log.Printf("info: did not find that specific type of command: %#v\n", proc.subject.CommandOrEvent) - } -} - // sendErrorMessage will put the error message directly on the channel that is // read by the nats publishing functions. func sendErrorLogMessage(newMessagesCh chan<- []subjectAndMessage, FromNode node, theError error) { diff --git a/subscribers.go b/subscribers.go index 92a434c..f840b7c 100644 --- a/subscribers.go +++ b/subscribers.go @@ -2,53 +2,34 @@ package steward import ( "fmt" - "log" - - "github.com/nats-io/nats.go" ) -func (s *server) subscribeMessages(proc process) { - subject := string(proc.subject.name()) - - // Subscribe will start up a Go routine under the hood calling the - // callback function specified when a new message is received. - _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { - // We start one handler per message received by using go routines here. - // This is for being able to reply back the current publisher who sent - // the message. - go s.subscriberHandler(s.natsConn, s.nodeName, msg, proc) - }) - if err != nil { - log.Printf("error: Subscribe failed: %v\n", err) - } -} - func (s *server) subscribersStart() { // Start a subscriber for CLICommand messages { fmt.Printf("Starting CLICommand subscriber: %#v\n", s.nodeName) sub := newSubject(CLICommand, CommandACK, s.nodeName) - proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}) + proc := newProcess(s, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"central", "ship2"}) // fmt.Printf("*** %#v\n", proc) - go s.spawnWorkerProcess(proc) + go proc.spawnWorker(s) } // Start a subscriber for textLogging messages { fmt.Printf("Starting textlogging subscriber: %#v\n", s.nodeName) sub := newSubject(TextLogging, EventACK, s.nodeName) - proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) + proc := newProcess(s, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) // fmt.Printf("*** %#v\n", proc) - go s.spawnWorkerProcess(proc) + go proc.spawnWorker(s) } // Start a subscriber for SayHello messages { fmt.Printf("Starting SayHello subscriber: %#v\n", s.nodeName) sub := newSubject(SayHello, EventNACK, s.nodeName) - proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) + proc := newProcess(s, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) // fmt.Printf("*** %#v\n", proc) - go s.spawnWorkerProcess(proc) + go proc.spawnWorker(s) } if s.centralErrorLogger { @@ -56,9 +37,9 @@ func (s *server) subscribersStart() { { fmt.Printf("Starting ErrorLog subscriber: %#v\n", s.nodeName) sub := newSubject(ErrorLog, EventNACK, "errorCentral") - proc := s.processPrepareNew(sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) + proc := newProcess(s, sub, s.errorKernel.errorCh, processKindSubscriber, []node{"*"}) // fmt.Printf("*** %#v\n", proc) - go s.spawnWorkerProcess(proc) + go proc.spawnWorker(s) } } }