1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

added same process structure to subscriber as publisher

This commit is contained in:
postmannen 2021-02-09 13:48:02 +01:00
parent 29254c04d0
commit f0f659f23c
2 changed files with 37 additions and 12 deletions

View file

@ -283,6 +283,23 @@ func (s *server) processSpawnWorker(proc process) {
} }
} }
} }
if proc.processKind == processKindSubscriber {
//subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
subject := string(proc.subject.name())
// Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received.
_, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// We start one handler per message received by using go routines here.
// This is for being able to reply back the current publisher who sent
// the message.
go handler(s.natsConn, s.nodeName, msg)
})
if err != nil {
log.Printf("error: Subscribe failed: %v\n", err)
}
}
} }
func messageDeliver(proc process, message Message, natsConn *nats.Conn) { func messageDeliver(proc process, message Message, natsConn *nats.Conn) {

View file

@ -14,20 +14,28 @@ import (
// TODO: Right now the only thing a subscriber can do is ro receive commands, // TODO: Right now the only thing a subscriber can do is ro receive commands,
// check if there are more things a subscriber should be able to do. // check if there are more things a subscriber should be able to do.
func (s *server) RunSubscriber() { func (s *server) RunSubscriber() {
subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") {
fmt.Printf("nodeName: %#v\n", s.nodeName)
// Subscribe will start up a Go routine under the hood calling the sub := newSubject(s.nodeName, "command", "shellcommand")
// callback function specified when a new message is received. proc := s.processPrepareNew(sub, s.errorCh, processKindSubscriber)
_, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) { // fmt.Printf("*** %#v\n", proc)
// We start one handler per message received by using go routines here. go s.processSpawnWorker(proc)
// This is for being able to reply back the current publisher who sent
// the message.
go handler(s.natsConn, s.nodeName, msg)
})
if err != nil {
log.Printf("error: Subscribe failed: %v\n", err)
} }
// subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand")
//
// // Subscribe will start up a Go routine under the hood calling the
// // callback function specified when a new message is received.
// _, err := s.natsConn.Subscribe(subject, func(msg *nats.Msg) {
// // We start one handler per message received by using go routines here.
// // This is for being able to reply back the current publisher who sent
// // the message.
// go handler(s.natsConn, s.nodeName, msg)
// })
// if err != nil {
// log.Printf("error: Subscribe failed: %v\n", err)
// }
select {} select {}
} }