From ec76bd36cd651090b666f207b39ef338368ed031 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 1 Feb 2021 13:41:04 +0100 Subject: [PATCH] Moved out the error handling from the process run --- publisher.go | 44 +++++++++++++++++++++++++------------------- subscriber.go | 4 ++-- 2 files changed, 27 insertions(+), 21 deletions(-) diff --git a/publisher.go b/publisher.go index 9acc008..96492de 100644 --- a/publisher.go +++ b/publisher.go @@ -56,7 +56,7 @@ type server struct { processes map[node]process // The last processID created lastProcessID int - thisNodeName string + nodeName string } // newServer will prepare and return a server type @@ -66,23 +66,12 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { log.Printf("error: nats.Connect failed: %v\n", err) } - return &server{ - thisNodeName: nodeName, - natsConn: conn, - processes: make(map[node]process), - }, nil -} + s := &server{ + nodeName: nodeName, + natsConn: conn, + processes: make(map[node]process), + } -func (s *server) RunPublisher() { - proc := s.prepareNewProcess("btship1") - // fmt.Printf("*** %#v\n", proc) - go s.spawnProcess(proc) - - proc = s.prepareNewProcess("btship2") - // fmt.Printf("*** %#v\n", proc) - go s.spawnProcess(proc) - - // start the error handling go func() { for { @@ -98,6 +87,19 @@ func (s *server) RunPublisher() { } }() + return s, nil + +} + +func (s *server) RunPublisher() { + proc := s.prepareNewProcess("btship1") + // fmt.Printf("*** %#v\n", proc) + go s.spawnProcess(proc) + + proc = s.prepareNewProcess("btship2") + // fmt.Printf("*** %#v\n", proc) + go s.spawnProcess(proc) + select {} } @@ -118,6 +120,8 @@ type process struct { errorCh chan string } +// prepareNewProcess will set the the provided values and the default +// values for a process. func (s *server) prepareNewProcess(nodeName string) process { // create the initial configuration for a sessions communicating with 1 host. s.lastProcessID++ @@ -131,7 +135,9 @@ func (s *server) prepareNewProcess(nodeName string) process { return proc } -// spawnProcess will spawn a new process +// spawnProcess will spawn a new process. It will give the process +// the next available ID, and also add the process to the processes +// map. func (s *server) spawnProcess(proc process) { mu.Lock() s.processes[proc.node] = proc @@ -151,7 +157,7 @@ func (s *server) spawnProcess(proc process) { // simulate that we get an error, and that we can send that // out of the process and receive it in another thread. - // s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) + s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) //fmt.Printf("%#v\n", s.processes[proc.node]) } diff --git a/subscriber.go b/subscriber.go index dba33c2..56d5f1e 100644 --- a/subscriber.go +++ b/subscriber.go @@ -18,8 +18,8 @@ func (s *server) RunSubscriber() { // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. - subject := fmt.Sprintf("%s.%s.%s", s.thisNodeName, "command", "shellcommand") - _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.thisNodeName)) + subject := fmt.Sprintf("%s.%s.%s", s.nodeName, "command", "shellcommand") + _, err := s.natsConn.Subscribe(subject, listenForMessage(s.natsConn, reqMsgCh, s.nodeName)) if err != nil { fmt.Printf("error: Subscribe failed: %v\n", err) }