diff --git a/message-and-subject.go b/message-and-subject.go index ed63289..b367e97 100644 --- a/message-and-subject.go +++ b/message-and-subject.go @@ -52,7 +52,10 @@ type Subject struct { CommandOrEvent CommandOrEvent `json:"commandOrEvent" yaml:"commandOrEvent"` // method, what is this message doing, etc. CLICommand, Syslog, etc. Method Method `json:"method" yaml:"method"` - // messageCh is the channel for receiving new content to be sent + // messageCh is used by publisher kind processes to read new messages + // to be published. The content on this channel have been routed here + // from routeMessagesToPublish in *server. + // This channel is only used for publishing processes. messageCh chan Message } diff --git a/process.go b/process.go index 980e72c..3bff036 100644 --- a/process.go +++ b/process.go @@ -113,7 +113,20 @@ func (p process) spawnWorker(s *server) { // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. if p.processKind == processKindPublisher { - go p.publishMessages(s) + // If there is a procFunc for the process, start it. + if p.procFunc != nil { + // REMOVED: p.procFuncCh = make(chan Message) + // Start the procFunc in it's own anonymous func so we are able + // to get the return error. + go func() { + err := p.procFunc() + if err != nil { + log.Printf("error: spawnWorker: procFunc failed: %v\n", err) + } + }() + } + + go p.publishMessages(s.natsConn, s.processes) } // Start a subscriber worker, which will start a go routine (process) @@ -140,7 +153,7 @@ func (p process) spawnWorker(s *server) { // messageDeliverNats will take care of the delivering the message // as converted to gob format as a nats.Message. It will also take // care of checking timeouts and retries specified for the message. -func (s *server) messageDeliverNats(proc process, message Message) { +func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { retryAttempts := 0 for { @@ -150,11 +163,11 @@ func (s *server) messageDeliverNats(proc process, message Message) { } msg := &nats.Msg{ - Subject: string(proc.subject.name()), + Subject: string(p.subject.name()), // Subject: fmt.Sprintf("%s.%s.%s", proc.node, "command", "CLICommand"), // Structure of the reply message are: // reply... - Reply: fmt.Sprintf("reply.%s", proc.subject.name()), + Reply: fmt.Sprintf("reply.%s", p.subject.name()), Data: dataPayload, } @@ -163,14 +176,14 @@ func (s *server) messageDeliverNats(proc process, message Message) { // that sends out a message every second. // // Create a subscriber for the reply message. - subReply, err := s.natsConn.SubscribeSync(msg.Reply) + subReply, err := natsConn.SubscribeSync(msg.Reply) if err != nil { log.Printf("error: nc.SubscribeSync failed: failed to create reply message: %v\n", err) continue } // Publish message - err = s.natsConn.PublishMsg(msg) + err = natsConn.PublishMsg(msg) if err != nil { log.Printf("error: publish failed: %v\n", err) continue @@ -179,13 +192,13 @@ func (s *server) messageDeliverNats(proc process, message Message) { // If the message is an ACK type of message we must check that a // reply, and if it is not we don't wait here at all. fmt.Printf("info: messageDeliverNats: preparing to send message: %v\n", message) - if proc.subject.CommandOrEvent == CommandACK || proc.subject.CommandOrEvent == EventACK { + if p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK { // Wait up until timeout specified for a reply, // continue and resend if noo reply received, // or exit if max retries for the message reached. msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) if err != nil { - log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err) + log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", p.node, p.subject.name(), err) // did not receive a reply, decide what to do.. retryAttempts++ @@ -239,7 +252,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na switch { case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: // REMOVED: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) - mf, ok := p.methodsAvailable.CheckIfExists(message.Method) + mh, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { // TODO: Check how errors should be handled here!!! log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) @@ -257,7 +270,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na // The handler started here is what actually doing the action // that executed a CLI command, or writes to a log file on // the node who received the message. - out, err = mf.handler(p, message, thisNode) + out, err = mh.handler(p, message, thisNode) if err != nil { // TODO: Send to error kernel ? @@ -320,20 +333,29 @@ func (p process) subscribeMessages(s *server) { } } -func (p process) publishMessages(s *server) { +// publishMessages will do the publishing of messages for one single +// process. +func (p process) publishMessages(natsConn *nats.Conn, processes *processes) { for { // Wait and read the next message on the message channel m := <-p.subject.messageCh + + // Get the process name so we can look up the process in the + // processes map, and increment the message counter. pn := processNameGet(p.subject.name(), processKindPublisher) - m.ID = s.processes.active[pn].messageID - s.messageDeliverNats(p, m) + m.ID = processes.active[pn].messageID + + p.messageDeliverNats(natsConn, m) + + // Signaling back to the ringbuffer that we are done with the + // current message, and it can remove it from the ringbuffer. m.done <- struct{}{} // Increment the counter for the next message to be sent. p.messageID++ - s.processes.mu.Lock() - s.processes.active[pn] = p - s.processes.mu.Unlock() + processes.mu.Lock() + processes.active[pn] = p + processes.mu.Unlock() // REMOVED: sleep //time.Sleep(time.Second * 1) diff --git a/publisher.go b/publisher.go deleted file mode 100644 index 9474579..0000000 --- a/publisher.go +++ /dev/null @@ -1,102 +0,0 @@ -package steward - -import ( - "log" -) - -// processNewMessages takes a database name and an input channel as -// it's input arguments. -// The database will be used as the persistent store for the work queue -// which is implemented as a ring buffer. -// The input channel are where we read new messages to publish. -// Incomming messages will be routed to the correct subject process, where -// the handling of each nats subject is handled within it's own separate -// worker process. -// It will also handle the process of spawning more worker processes -// for publisher subjects if it does not exist. -func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndMessage) { - // Prepare and start a new ring buffer - const bufferSize int = 1000 - rb := newringBuffer(bufferSize, dbFileName) - inCh := make(chan subjectAndMessage) - ringBufferOutCh := make(chan samDBValue) - // start the ringbuffer. - rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries) - - // Start reading new fresh messages received on the incomming message - // pipe/file requested, and fill them into the buffer. - go func() { - for samSlice := range newSAM { - for _, sam := range samSlice { - inCh <- sam - } - } - close(inCh) - }() - - // Process the messages that are in the ring buffer. Check and - // send if there are a specific subject for it, and if no subject - // exist throw an error. - - var coe CommandOrEvent - coeAvailable := coe.GetCommandOrEventAvailable() - - var method Method - methodsAvailable := method.GetMethodsAvailable() - - go func() { - for samTmp := range ringBufferOutCh { - sam := samTmp.Data - // Check if the format of the message is correct. - // TODO: Send a message to the error kernel here that - // it was unable to process the message with the reason - // why ? - if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { - log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method) - continue - } - if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { - log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent) - continue - } - - redo: - // Adding a label here so we are able to redo the sending - // of the last message if a process with specified subject - // is not present. The process will then be created, and - // the code will loop back to the redo: label. - - m := sam.Message - subjName := sam.Subject.name() - // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) - pn := processNameGet(subjName, processKindPublisher) - _, ok := s.processes.active[pn] - - // Are there already a process for that subject, put the - // message on that processes incomming message channel. - if ok { - log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) - s.processes.active[pn].subject.messageCh <- m - - // If no process to handle the specific subject exist, - // the we create and spawn one. - } else { - // If a publisher process do not exist for the given subject, create it, and - // by using the goto at the end redo the process for this specific message. - log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) - - sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) - proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) - // fmt.Printf("*** %#v\n", proc) - proc.spawnWorker(s) - - // REMOVED: - //time.Sleep(time.Millisecond * 500) - s.printProcessesMap() - // Now when the process is spawned we jump back to the redo: label, - // and send the message to that new process. - goto redo - } - } - }() -} diff --git a/server.go b/server.go index 9a13f70..f5310e2 100644 --- a/server.go +++ b/server.go @@ -25,7 +25,7 @@ type processes struct { // The active spawned processes active map[processName]process // mutex to lock the map - mu sync.Mutex + mu sync.RWMutex // The last processID created lastProcessID int } @@ -131,8 +131,8 @@ func (s *server) Start() { time.Sleep(time.Second * 1) s.printProcessesMap() - // Start the processing of new messaging from an input channel. - s.processNewMessages("./incommmingBuffer.db", s.newMessagesCh) + // Start the processing of new messages from an input channel. + s.routeMessagesToPublish("./incommmingBuffer.db", s.newMessagesCh) select {} @@ -187,3 +187,100 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage { return sam } + +// routeMessagesToPublish takes a database name and an input channel as +// it's input arguments. +// The database will be used as the persistent store for the work queue +// which is implemented as a ring buffer. +// The input channel are where we read new messages to publish. +// Incomming messages will be routed to the correct subject process, where +// the handling of each nats subject is handled within it's own separate +// worker process. +// It will also handle the process of spawning more worker processes +// for publisher subjects if it does not exist. +func (s *server) routeMessagesToPublish(dbFileName string, newSAM chan []subjectAndMessage) { + // Prepare and start a new ring buffer + const bufferSize int = 1000 + rb := newringBuffer(bufferSize, dbFileName) + inCh := make(chan subjectAndMessage) + ringBufferOutCh := make(chan samDBValue) + // start the ringbuffer. + rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries) + + // Start reading new fresh messages received on the incomming message + // pipe/file requested, and fill them into the buffer. + go func() { + for samSlice := range newSAM { + for _, sam := range samSlice { + inCh <- sam + } + } + close(inCh) + }() + + // Process the messages that are in the ring buffer. Check and + // send if there are a specific subject for it, and if no subject + // exist throw an error. + + var coe CommandOrEvent + coeAvailable := coe.GetCommandOrEventAvailable() + + var method Method + methodsAvailable := method.GetMethodsAvailable() + + go func() { + for samTmp := range ringBufferOutCh { + sam := samTmp.Data + // Check if the format of the message is correct. + // TODO: Send a message to the error kernel here that + // it was unable to process the message with the reason + // why ? + if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { + log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method) + continue + } + if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { + log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent) + continue + } + + redo: + // Adding a label here so we are able to redo the sending + // of the last message if a process with specified subject + // is not present. The process will then be created, and + // the code will loop back to the redo: label. + + m := sam.Message + subjName := sam.Subject.name() + // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) + pn := processNameGet(subjName, processKindPublisher) + _, ok := s.processes.active[pn] + + // Are there already a process for that subject, put the + // message on that processes incomming message channel. + if ok { + log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) + s.processes.active[pn].subject.messageCh <- m + + // If no process to handle the specific subject exist, + // the we create and spawn one. + } else { + // If a publisher process do not exist for the given subject, create it, and + // by using the goto at the end redo the process for this specific message. + log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) + + sub := newSubject(sam.Subject.Method, sam.Subject.CommandOrEvent, sam.Subject.ToNode) + proc := newProcess(s.processes, s.newMessagesCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, nil) + // fmt.Printf("*** %#v\n", proc) + proc.spawnWorker(s) + + // REMOVED: + //time.Sleep(time.Millisecond * 500) + s.printProcessesMap() + // Now when the process is spawned we jump back to the redo: label, + // and send the message to that new process. + goto redo + } + } + }() +}