From 37733f59745e23051803b84ac9f60664b71ff9cf Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 28 Jan 2021 14:58:16 +0100 Subject: [PATCH] updating for concurrency --- central/main.go | 70 +++++++++++++++++++++++++++++++++++-------------- edge/main.go | 8 +++--- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/central/main.go b/central/main.go index 6ee68aa..7be0a78 100644 --- a/central/main.go +++ b/central/main.go @@ -7,11 +7,14 @@ import ( "fmt" "log" "os" + "sync" "time" "github.com/nats-io/nats.go" ) +var mu sync.Mutex + type messageType int // TODO: Figure it makes sense to have these types at all. @@ -46,22 +49,8 @@ type Message struct { MessageType messageType } -type node string - -// process are represent the communication to one individual host -type process struct { - messageID int - subject string - // Put a node here to be able know the node a process is at. - // NB: Might not be needed later on. - node node - // The processID for the current process - processID int -} - // server is the structure that will hold the state about spawned // processes on a local instance. - type server struct { natsConn *nats.Conn // TODO: sessions should probably hold a slice/map of processes ? @@ -79,9 +68,43 @@ func newServer(brokerAddress string) (*server, error) { func (s *server) Run() { proc := s.prepareNewProcess("btship1") + // fmt.Printf("*** %#v\n", proc) go s.spawnProcess(proc) + // start the error handling + go func() { + + for { + for k := range s.processes { + select { + case e := <-s.processes[k].errorCh: + fmt.Printf("*** %v\n", e) + default: + time.Sleep(time.Millisecond * 100) + } + + } + } + }() + select {} + +} + +type node string + +// process are represent the communication to one individual host +type process struct { + messageID int + subject string + // Put a node here to be able know the node a process is at. + // NB: Might not be needed later on. + node node + // The processID for the current process + processID int + // errorCh is used to report errors from a process + // NB: Implementing this as an int to report for testing + errorCh chan string } func (s *server) prepareNewProcess(nodeName string) process { @@ -91,6 +114,7 @@ func (s *server) prepareNewProcess(nodeName string) process { messageID: 0, node: node(nodeName), processID: s.lastProcessID, + errorCh: make(chan string), } return proc @@ -98,19 +122,27 @@ func (s *server) prepareNewProcess(nodeName string) process { // spawnProcess will spawn a new process func (s *server) spawnProcess(proc process) { + mu.Lock() s.processes[proc.node] = proc + mu.Unlock() // Loop creating one new message every second to simulate getting new // messages to deliver. for { m := getMessageToDeliver() - m.ID = s.processes["btship1"].messageID - messageDeliver("btship1", m, s.natsConn) + m.ID = s.processes[proc.node].messageID + messageDeliver(string(proc.node), m, s.natsConn) // Increment the counter for the next message to be sent. proc.messageID++ - s.processes["btship1"] = proc + s.processes[proc.node] = proc time.Sleep(time.Second * 1) + + // simulate that we get an error, and that we can send that + // out of the process and receive it in another thread. + s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) + + //fmt.Printf("%#v\n", s.processes[proc.node]) } } @@ -123,7 +155,7 @@ func getMessageToDeliver() Message { } } -func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) { +func messageDeliver(node string, message Message, natsConn *nats.Conn) { for { dataPayload, err := gobEncodePayload(message) if err != nil { @@ -131,7 +163,7 @@ func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) { } msg := &nats.Msg{ - Subject: edgeID, + Subject: node, Reply: "subjectReply", Data: dataPayload, } diff --git a/edge/main.go b/edge/main.go index 3c4c337..eb3df51 100644 --- a/edge/main.go +++ b/edge/main.go @@ -42,7 +42,7 @@ type Message struct { } func main() { - edgeID := flag.String("edgeID", "0", "some unique string to identify this Edge unit") + node := flag.String("node", "0", "some unique string to identify this Edge unit") flag.Parse() // Create a connection to nats server, and publish a message. @@ -58,7 +58,7 @@ func main() { // Subscribe will start up a Go routine under the hood calling the // callback function specified when a new message is received. - _, err = natsConn.Subscribe(*edgeID, listenForMessage(natsConn, reqMsgCh)) + _, err = natsConn.Subscribe(*node, listenForMessage(natsConn, reqMsgCh, *node)) if err != nil { fmt.Printf("error: Subscribe failed: %v\n", err) } @@ -86,7 +86,7 @@ func main() { // Listen for message will send an ACK message back to the sender, // and put the received incomming message on the reqMsg channel // for further processing. -func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) { +func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) func(req *nats.Msg) { return func(req *nats.Msg) { message := Message{} @@ -103,6 +103,6 @@ func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats reqMsgCh <- message // Send a confirmation message back to the publisher - natsConn.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID))) + natsConn.Publish(req.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID))) } }