1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-15 17:51:15 +00:00

fixed send to yourself

This commit is contained in:
postmannen 2021-02-26 07:55:28 +01:00
parent 77df07533c
commit f736212307
5 changed files with 26 additions and 8 deletions

1
.gitignore vendored
View file

@ -3,3 +3,4 @@ ship2/
tmp/ tmp/
incommmingBuffer.db incommmingBuffer.db
store.log store.log
changes.md

Binary file not shown.

View file

@ -63,13 +63,14 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
m := sam.Message m := sam.Message
subjName := sam.Subject.name() subjName := sam.Subject.name()
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject) // DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
_, ok := s.processes[subjName] pn := processNameGet(subjName, processKindPublisher)
_, ok := s.processes[pn]
// Are there already a process for that subject, put the // Are there already a process for that subject, put the
// message on that processes incomming message channel. // message on that processes incomming message channel.
if ok { if ok {
log.Printf("info: found the specific subject: %v\n", subjName) log.Printf("info: found the specific subject: %v\n", subjName)
s.processes[subjName].subject.messageCh <- m s.processes[pn].subject.messageCh <- m
// If no process to handle the specific subject exist, // If no process to handle the specific subject exist,
// the we create and spawn one. // the we create and spawn one.
@ -97,13 +98,14 @@ func (s *server) publishMessages(proc process) {
for { for {
// Wait and read the next message on the message channel // Wait and read the next message on the message channel
m := <-proc.subject.messageCh m := <-proc.subject.messageCh
m.ID = s.processes[proc.subject.name()].messageID pn := processNameGet(proc.subject.name(), processKindPublisher)
m.ID = s.processes[pn].messageID
s.messageDeliverNats(proc, m) s.messageDeliverNats(proc, m)
m.done <- struct{}{} m.done <- struct{}{}
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
proc.messageID++ proc.messageID++
s.processes[proc.subject.name()] = proc s.processes[pn] = proc
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
// NB: simulate that we get an error, and that we can send that // NB: simulate that we get an error, and that we can send that

View file

@ -13,12 +13,19 @@ import (
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
type processName string
func processNameGet(sn subjectName, pk processKind) processName {
pn := fmt.Sprintf("%s_%s", sn, pk)
return processName(pn)
}
// server is the structure that will hold the state about spawned // server is the structure that will hold the state about spawned
// processes on a local instance. // processes on a local instance.
type server struct { type server struct {
natsConn *nats.Conn natsConn *nats.Conn
// TODO: sessions should probably hold a slice/map of processes ? // TODO: sessions should probably hold a slice/map of processes ?
processes map[subjectName]process processes map[processName]process
// The last processID created // The last processID created
lastProcessID int lastProcessID int
// The name of the node // The name of the node
@ -68,7 +75,7 @@ func NewServer(brokerAddress string, nodeName string, promHostAndPort string, ce
s := &server{ s := &server{
nodeName: nodeName, nodeName: nodeName,
natsConn: conn, natsConn: conn,
processes: make(map[subjectName]process), processes: make(map[processName]process),
newMessagesCh: make(chan []subjectAndMessage), newMessagesCh: make(chan []subjectAndMessage),
methodsAvailable: m.GetMethodsAvailable(), methodsAvailable: m.GetMethodsAvailable(),
commandOrEventAvailable: coe.GetCommandOrEventAvailable(), commandOrEventAvailable: coe.GetCommandOrEventAvailable(),
@ -192,7 +199,15 @@ func (s *server) spawnWorkerProcess(proc process) {
// We use the full name of the subject to identify a unique // We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle // process. We can do that since a process can only handle
// one message queue. // one message queue.
s.processes[proc.subject.name()] = proc var pn processName
if proc.processKind == processKindPublisher {
pn = processNameGet(proc.subject.name(), processKindPublisher)
}
if proc.processKind == processKindSubscriber {
pn = processNameGet(proc.subject.name(), processKindSubscriber)
}
s.processes[pn] = proc
s.mu.Unlock() s.mu.Unlock()
// TODO: I think it makes most sense that the messages would come to // TODO: I think it makes most sense that the messages would come to

View file

@ -107,7 +107,7 @@ func (m methodEventTextLogging) handler(s *server, message Message, node string)
type methodEventSayHello struct{} type methodEventSayHello struct{}
func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) { func (m methodEventSayHello) handler(s *server, message Message, node string) ([]byte, error) {
log.Printf("################## Received hello from %v ##################\n", message.FromNode) log.Printf("<--- Received hello from %v \n", message.FromNode)
// Since the handler is only called to handle a specific type of message we need // Since the handler is only called to handle a specific type of message we need
// to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes // to store it elsewhere, and choice for now is under s.metrics.sayHelloNodes
s.subscriberServices.sayHelloNodes[message.FromNode] = struct{}{} s.subscriberServices.sayHelloNodes[message.FromNode] = struct{}{}