diff --git a/process.go b/process.go index 782a394..2a60ca6 100644 --- a/process.go +++ b/process.go @@ -38,13 +38,15 @@ type process struct { processKind processKind // Who are we allowed to receive from ? allowedReceivers map[node]struct{} + // methodsAvailable + methodsAvailable MethodsAvailable } // prepareNewProcess will set the the provided values and the default // values for a process. func newProcess(s *server, subject Subject, errCh chan errProcess, processKind processKind, allowedReceivers []node) process { // create the initial configuration for a sessions communicating with 1 host process. - s.lastProcessID++ + s.processes.lastProcessID++ // make the slice of allowedReceivers into a map value for easy lookup. m := make(map[node]struct{}) @@ -52,14 +54,17 @@ func newProcess(s *server, subject Subject, errCh chan errProcess, processKind p m[a] = struct{}{} } + var method Method + proc := process{ messageID: 0, subject: subject, node: node(subject.ToNode), - processID: s.lastProcessID, + processID: s.processes.lastProcessID, errorCh: errCh, processKind: processKind, allowedReceivers: m, + methodsAvailable: method.GetMethodsAvailable(), } return proc @@ -85,9 +90,9 @@ func (p process) spawnWorker(s *server) { } // Add information about the new process to the started processes map. - s.mu.Lock() - s.processes[pn] = p - s.mu.Unlock() + s.processes.mu.Lock() + s.processes.active[pn] = p + s.processes.mu.Unlock() // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. @@ -204,7 +209,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na switch { case p.subject.CommandOrEvent == CommandACK || p.subject.CommandOrEvent == EventACK: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) - mf, ok := s.methodsAvailable.CheckIfExists(message.Method) + mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { // TODO: Check how errors should be handled here!!! log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) @@ -243,7 +248,7 @@ func (p process) subscriberHandler(natsConn *nats.Conn, thisNode string, msg *na } case p.subject.CommandOrEvent == CommandNACK || p.subject.CommandOrEvent == EventNACK: log.Printf("info: subscriberHandler: ACK Message received received, preparing to call handler: %v\n", p.subject.name()) - mf, ok := s.methodsAvailable.CheckIfExists(message.Method) + mf, ok := p.methodsAvailable.CheckIfExists(message.Method) if !ok { // TODO: Check how errors should be handled here!!! log.Printf("error: subscriberHandler: method type not available: %v\n", p.subject.CommandOrEvent) @@ -287,13 +292,13 @@ func (p process) publishMessages(s *server) { // Wait and read the next message on the message channel m := <-p.subject.messageCh pn := processNameGet(p.subject.name(), processKindPublisher) - m.ID = s.processes[pn].messageID + m.ID = s.processes.active[pn].messageID s.messageDeliverNats(p, m) m.done <- struct{}{} // Increment the counter for the next message to be sent. p.messageID++ - s.processes[pn] = p + s.processes.active[pn] = p time.Sleep(time.Second * 1) // NB: simulate that we get an error, and that we can send that diff --git a/publisher.go b/publisher.go index d08bd60..95599e2 100644 --- a/publisher.go +++ b/publisher.go @@ -22,7 +22,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValue) // start the ringbuffer. - rb.start(inCh, ringBufferOutCh, s.defaultMessageTimeout, s.defaultMessageRetries) + rb.start(inCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries) // Start reading new fresh messages received on the incomming message // pipe/file requested, and fill them into the buffer. @@ -38,6 +38,13 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM // Process the messages that are in the ring buffer. Check and // send if there are a specific subject for it, and if no subject // exist throw an error. + + var coe CommandOrEvent + coeAvailable := coe.GetCommandOrEventAvailable() + + var method Method + methodsAvailable := method.GetMethodsAvailable() + go func() { for samTmp := range ringBufferOutCh { sam := samTmp.Data @@ -45,11 +52,11 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM // TODO: Send a message to the error kernel here that // it was unable to process the message with the reason // why ? - if _, ok := s.methodsAvailable.CheckIfExists(sam.Message.Method); !ok { + if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { log.Printf("error: the method do not exist, message dropped: %v\n", sam.Message.Method) continue } - if !s.commandOrEventAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { + if !coeAvailable.CheckIfExists(sam.Subject.CommandOrEvent, sam.Subject) { log.Printf("error: the command or event do not exist, message dropped: %v\n", sam.Subject.CommandOrEvent) continue } @@ -64,13 +71,13 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM subjName := sam.Subject.name() // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) pn := processNameGet(subjName, processKindPublisher) - _, ok := s.processes[pn] + _, ok := s.processes.active[pn] // Are there already a process for that subject, put the // message on that processes incomming message channel. if ok { log.Printf("info: processNewMessages: found the specific subject: %v\n", subjName) - s.processes[pn].subject.messageCh <- m + s.processes.active[pn].subject.messageCh <- m // If no process to handle the specific subject exist, // the we create and spawn one. diff --git a/server.go b/server.go index 7f04708..95cce0e 100644 --- a/server.go +++ b/server.go @@ -19,6 +19,25 @@ func processNameGet(sn subjectName, pk processKind) processName { return processName(pn) } +// processes holds all the information about running processes +type processes struct { + // The active spawned processes + active map[processName]process + // mutex to lock the map + mu sync.Mutex + // The last processID created + lastProcessID int +} + +// newProcesses will prepare and return a *processes +func newProcesses() *processes { + p := processes{ + active: make(map[processName]process), + } + + return &p +} + // server is the structure that will hold the state about spawned // processes on a local instance. type server struct { @@ -26,28 +45,16 @@ type server struct { configuration *Configuration // The nats connection to the broker natsConn *nats.Conn - // TODO: sessions should probably hold a slice/map of processes ? - processes map[processName]process - // The last processID created - lastProcessID int + // processes holds all the information about running processes + processes *processes // The name of the node nodeName string // Mutex for locking when writing to the process map - mu sync.Mutex - // The channel where we put new messages read from file, - // or some other process who wants to send something via the - // system - // We can than range this channel for new messages to process. newMessagesCh chan []subjectAndMessage // errorKernel is doing all the error handling like what to do if // an error occurs. // TODO: Will also send error messages to cental error subscriber. errorKernel *errorKernel - // used to check if the methods specified in message is valid - methodsAvailable MethodsAvailable - // Map who holds the command and event types available. - // Used to check if the commandOrEvent specified in message is valid - commandOrEventAvailable CommandOrEventAvailable // metric exporter metrics *metrics // subscriberServices are where we find the services and the API to @@ -60,10 +67,6 @@ type server struct { // collection of the publisher services and the types to control them publisherServices *publisherServices centralErrorLogger bool - // default message timeout in seconds. This can be overridden on the message level - defaultMessageTimeout int - // default amount of retries that will be done before a message is thrown away, and out of the system - defaultMessageRetries int } // newServer will prepare and return a server type @@ -73,23 +76,16 @@ func NewServer(c *Configuration) (*server, error) { log.Printf("error: nats.Connect failed: %v\n", err) } - var m Method - var coe CommandOrEvent - s := &server{ - configuration: c, - nodeName: c.NodeName, - natsConn: conn, - processes: make(map[processName]process), - newMessagesCh: make(chan []subjectAndMessage), - methodsAvailable: m.GetMethodsAvailable(), - commandOrEventAvailable: coe.GetCommandOrEventAvailable(), - metrics: newMetrics(c.PromHostAndPort), - subscriberServices: newSubscriberServices(), - publisherServices: newPublisherServices(c.PublisherServiceSayhello), - centralErrorLogger: c.CentralErrorLogger, - defaultMessageTimeout: c.DefaultMessageTimeout, - defaultMessageRetries: c.DefaultMessageRetries, + configuration: c, + nodeName: c.NodeName, + natsConn: conn, + processes: newProcesses(), + newMessagesCh: make(chan []subjectAndMessage), + metrics: newMetrics(c.PromHostAndPort), + subscriberServices: newSubscriberServices(), + publisherServices: newPublisherServices(c.PublisherServiceSayhello), + centralErrorLogger: c.CentralErrorLogger, } // Create the default data folder for where subscribers should @@ -151,18 +147,18 @@ func (s *server) Start() { func (s *server) printProcessesMap() { fmt.Println("--------------------------------------------------------------------------------------------") fmt.Printf("*** Output of processes map :\n") - s.mu.Lock() - for _, v := range s.processes { + s.processes.mu.Lock() + for _, v := range s.processes.active { fmt.Printf("*** - : %v\n", v) } - s.mu.Unlock() + s.processes.mu.Unlock() s.metrics.metricsCh <- metricType{ metric: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "total_running_processes", Help: "The current number of total running processes", }), - value: float64(len(s.processes)), + value: float64(len(s.processes.active)), } fmt.Println("--------------------------------------------------------------------------------------------")