diff --git a/.gitignore b/.gitignore index afb9373..f44921a 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ ship2/ tmp/ incommmingBuffer.db store.log +changes.md diff --git a/incommmingBuffer.db b/incommmingBuffer.db index d7dca75..436f3fd 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/publisher.go b/publisher.go index 91858d5..7ee04d0 100644 --- a/publisher.go +++ b/publisher.go @@ -63,13 +63,14 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM m := sam.Message subjName := sam.Subject.name() // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) - _, ok := s.processes[subjName] + pn := processNameGet(subjName, processKindPublisher) + _, ok := s.processes[pn] // Are there already a process for that subject, put the // message on that processes incomming message channel. if ok { log.Printf("info: found the specific subject: %v\n", subjName) - s.processes[subjName].subject.messageCh <- m + s.processes[pn].subject.messageCh <- m // If no process to handle the specific subject exist, // the we create and spawn one. @@ -97,13 +98,14 @@ func (s *server) publishMessages(proc process) { for { // Wait and read the next message on the message channel m := <-proc.subject.messageCh - m.ID = s.processes[proc.subject.name()].messageID + pn := processNameGet(proc.subject.name(), processKindPublisher) + m.ID = s.processes[pn].messageID s.messageDeliverNats(proc, m) m.done <- struct{}{} // Increment the counter for the next message to be sent. proc.messageID++ - s.processes[proc.subject.name()] = proc + s.processes[pn] = proc time.Sleep(time.Second * 1) // NB: simulate that we get an error, and that we can send that diff --git a/server.go b/server.go index 650f93a..6b28e94 100644 --- a/server.go +++ b/server.go @@ -13,12 +13,19 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +type processName string + +func processNameGet(sn subjectName, pk processKind) processName { + pn := fmt.Sprintf("%s_%s", sn, pk) + return processName(pn) +} + // server is the structure that will hold the state about spawned // processes on a local instance. type server struct { natsConn *nats.Conn // TODO: sessions should probably hold a slice/map of processes ? - processes map[subjectName]process + processes map[processName]process // The last processID created lastProcessID int // The name of the node @@ -68,7 +75,7 @@ func NewServer(brokerAddress string, nodeName string, promHostAndPort string, ce s := &server{ nodeName: nodeName, natsConn: conn, - processes: make(map[subjectName]process), + processes: make(map[processName]process), newMessagesCh: make(chan []subjectAndMessage), methodsAvailable: m.GetMethodsAvailable(), commandOrEventAvailable: coe.GetCommandOrEventAvailable(), @@ -192,7 +199,15 @@ func (s *server) spawnWorkerProcess(proc process) { // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle // one message queue. - s.processes[proc.subject.name()] = proc + var pn processName + if proc.processKind == processKindPublisher { + pn = processNameGet(proc.subject.name(), processKindPublisher) + } + if proc.processKind == processKindSubscriber { + pn = processNameGet(proc.subject.name(), processKindSubscriber) + } + + s.processes[pn] = proc s.mu.Unlock() // TODO: I think it makes most sense that the messages would come to diff --git a/subscriberMethodTypes.go b/subscriberMethodTypes.go index 7a8f73c..964632a 100644 --- a/subscriberMethodTypes.go +++ b/subscriberMethodTypes.go @@ -107,7 +107,7 @@ func (m methodEventTextLogging) handler(s *server, message Message, node string) type methodEventSayHello struct{} func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) { - log.Printf("################## Received hello from %v ##################\n", message.FromNode) + log.Printf("<--- Received hello from %v \n", message.FromNode) // Since the handler is only called to handle a specific type of message we need // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes s.subscriberServices.sayHelloNodes[message.FromNode] = struct{}{}