From f0f659f23c3697207640865cd1eeb480119f467c Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 9 Feb 2021 13:48:02 +0100 Subject: [PATCH] added same process structure to subscriber as publisher --- publisher.go | 17 +++++++++++++++++ subscriber.go | 32 ++++++++++++++++++++------------ 2 files changed, 37 insertions(+), 12 deletions(-) diff --git a/publisher.go b/publisher.go index 7ee602d..4bb8093 100644 --- a/publisher.go +++ b/publisher.go @@ -283,6 +283,23 @@ func (s *server) processSpawnWorker(proc process) { } } } + + if proc.processKind == processKindSubscriber { + //subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") + subject := string(proc.subject.name()) + + // Subscribe will start up a Go routine under the hood calling the + // callback function specified when a new message is received. + _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // We start one handler per message received by using go routines here. + // This is for being able to reply back the current publisher who sent + // the message. + go handler(s.natsConn, s.nodeName, msg) + }) + if err != nil { + log.Printf("error: Subscribe failed: %v\n", err) + } + } } func messageDeliver(proc process, message Message, natsConn *nats.Conn) { diff --git a/subscriber.go b/subscriber.go index dc15b74..b14beaa 100644 --- a/subscriber.go +++ b/subscriber.go @@ -14,20 +14,28 @@ import ( // TODO: Right now the only thing a subscriber can do is ro receive commands, // check if there are more things a subscriber should be able to do. func (s *server) RunSubscriber() { - subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") - - // Subscribe will start up a Go routine under the hood calling the - // callback function specified when a new message is received. - _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { - // We start one handler per message received by using go routines here. - // This is for being able to reply back the current publisher who sent - // the message. - go handler(s.natsConn, s.nodeName, msg) - }) - if err != nil { - log.Printf("error: Subscribe failed: %v\n", err) + { + fmt.Printf("nodeName: %#v\n", s.nodeName) + sub := newSubject(s.nodeName, "command", "shellcommand") + proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber) + // fmt.Printf("*** %#v\n", proc) + go s.processSpawnWorker(proc) } + // subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") + // + // // Subscribe will start up a Go routine under the hood calling the + // // callback function specified when a new message is received. + // _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { + // // We start one handler per message received by using go routines here. + // // This is for being able to reply back the current publisher who sent + // // the message. + // go handler(s.natsConn, s.nodeName, msg) + // }) + // if err != nil { + // log.Printf("error: Subscribe failed: %v\n", err) + // } + select {} }