1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

updating for concurrency

This commit is contained in:
postmannen 2021-01-28 14:58:16 +01:00
parent 02b6aff3a8
commit 37733f5974
2 changed files with 55 additions and 23 deletions

View file

@ -7,11 +7,14 @@ import (
"fmt" "fmt"
"log" "log"
"os" "os"
"sync"
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
) )
var mu sync.Mutex
type messageType int type messageType int
// TODO: Figure it makes sense to have these types at all. // TODO: Figure it makes sense to have these types at all.
@ -46,22 +49,8 @@ type Message struct {
MessageType messageType MessageType messageType
} }
type node string
// process are represent the communication to one individual host
type process struct {
messageID int
subject string
// Put a node here to be able know the node a process is at.
// NB: Might not be needed later on.
node node
// The processID for the current process
processID int
}
// server is the structure that will hold the state about spawned // server is the structure that will hold the state about spawned
// processes on a local instance. // processes on a local instance.
type server struct { type server struct {
natsConn *nats.Conn natsConn *nats.Conn
// TODO: sessions should probably hold a slice/map of processes ? // TODO: sessions should probably hold a slice/map of processes ?
@ -79,9 +68,43 @@ func newServer(brokerAddress string) (*server, error) {
func (s *server) Run() { func (s *server) Run() {
proc := s.prepareNewProcess("btship1") proc := s.prepareNewProcess("btship1")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc) go s.spawnProcess(proc)
// start the error handling
go func() {
for {
for k := range s.processes {
select {
case e := <-s.processes[k].errorCh:
fmt.Printf("*** %v\n", e)
default:
time.Sleep(time.Millisecond * 100)
}
}
}
}()
select {} select {}
}
type node string
// process are represent the communication to one individual host
type process struct {
messageID int
subject string
// Put a node here to be able know the node a process is at.
// NB: Might not be needed later on.
node node
// The processID for the current process
processID int
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan string
} }
func (s *server) prepareNewProcess(nodeName string) process { func (s *server) prepareNewProcess(nodeName string) process {
@ -91,6 +114,7 @@ func (s *server) prepareNewProcess(nodeName string) process {
messageID: 0, messageID: 0,
node: node(nodeName), node: node(nodeName),
processID: s.lastProcessID, processID: s.lastProcessID,
errorCh: make(chan string),
} }
return proc return proc
@ -98,19 +122,27 @@ func (s *server) prepareNewProcess(nodeName string) process {
// spawnProcess will spawn a new process // spawnProcess will spawn a new process
func (s *server) spawnProcess(proc process) { func (s *server) spawnProcess(proc process) {
mu.Lock()
s.processes[proc.node] = proc s.processes[proc.node] = proc
mu.Unlock()
// Loop creating one new message every second to simulate getting new // Loop creating one new message every second to simulate getting new
// messages to deliver. // messages to deliver.
for { for {
m := getMessageToDeliver() m := getMessageToDeliver()
m.ID = s.processes["btship1"].messageID m.ID = s.processes[proc.node].messageID
messageDeliver("btship1", m, s.natsConn) messageDeliver(string(proc.node), m, s.natsConn)
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
proc.messageID++ proc.messageID++
s.processes["btship1"] = proc s.processes[proc.node] = proc
time.Sleep(time.Second * 1) time.Sleep(time.Second * 1)
// simulate that we get an error, and that we can send that
// out of the process and receive it in another thread.
s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID)
//fmt.Printf("%#v\n", s.processes[proc.node])
} }
} }
@ -123,7 +155,7 @@ func getMessageToDeliver() Message {
} }
} }
func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) { func messageDeliver(node string, message Message, natsConn *nats.Conn) {
for { for {
dataPayload, err := gobEncodePayload(message) dataPayload, err := gobEncodePayload(message)
if err != nil { if err != nil {
@ -131,7 +163,7 @@ func messageDeliver(edgeID string, message Message, natsConn *nats.Conn) {
} }
msg := &nats.Msg{ msg := &nats.Msg{
Subject: edgeID, Subject: node,
Reply: "subjectReply", Reply: "subjectReply",
Data: dataPayload, Data: dataPayload,
} }

View file

@ -42,7 +42,7 @@ type Message struct {
} }
func main() { func main() {
edgeID := flag.String("edgeID", "0", "some unique string to identify this Edge unit") node := flag.String("node", "0", "some unique string to identify this Edge unit")
flag.Parse() flag.Parse()
// Create a connection to nats server, and publish a message. // Create a connection to nats server, and publish a message.
@ -58,7 +58,7 @@ func main() {
// Subscribe will start up a Go routine under the hood calling the // Subscribe will start up a Go routine under the hood calling the
// callback function specified when a new message is received. // callback function specified when a new message is received.
_, err = natsConn.Subscribe(*edgeID, listenForMessage(natsConn, reqMsgCh)) _, err = natsConn.Subscribe(*node, listenForMessage(natsConn, reqMsgCh, *node))
if err != nil { if err != nil {
fmt.Printf("error: Subscribe failed: %v\n", err) fmt.Printf("error: Subscribe failed: %v\n", err)
} }
@ -86,7 +86,7 @@ func main() {
// Listen for message will send an ACK message back to the sender, // Listen for message will send an ACK message back to the sender,
// and put the received incomming message on the reqMsg channel // and put the received incomming message on the reqMsg channel
// for further processing. // for further processing.
func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats.Msg) { func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message, node string) func(req *nats.Msg) {
return func(req *nats.Msg) { return func(req *nats.Msg) {
message := Message{} message := Message{}
@ -103,6 +103,6 @@ func listenForMessage(natsConn *nats.Conn, reqMsgCh chan Message) func(req *nats
reqMsgCh <- message reqMsgCh <- message
// Send a confirmation message back to the publisher // Send a confirmation message back to the publisher
natsConn.Publish(req.Reply, []byte("confirmed: "+fmt.Sprint(message.ID))) natsConn.Publish(req.Reply, []byte("confirmed from: "+node+": "+fmt.Sprint(message.ID)))
} }
} }