1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 04:49:17 +00:00

concurrent processes initially seems to work

This commit is contained in:
postmannen 2021-01-29 06:09:48 +01:00
parent 37733f5974
commit b543ec589a

View file

@ -71,6 +71,10 @@ func (s *server) Run() {
// fmt.Printf("*** %#v\n", proc) // fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc) go s.spawnProcess(proc)
proc = s.prepareNewProcess("btship2")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
// start the error handling // start the error handling
go func() { go func() {
@ -131,7 +135,7 @@ func (s *server) spawnProcess(proc process) {
for { for {
m := getMessageToDeliver() m := getMessageToDeliver()
m.ID = s.processes[proc.node].messageID m.ID = s.processes[proc.node].messageID
messageDeliver(string(proc.node), m, s.natsConn) messageDeliver(proc, m, s.natsConn)
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
proc.messageID++ proc.messageID++
@ -140,7 +144,7 @@ func (s *server) spawnProcess(proc process) {
// simulate that we get an error, and that we can send that // simulate that we get an error, and that we can send that
// out of the process and receive it in another thread. // out of the process and receive it in another thread.
s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID) // s.processes[proc.node].errorCh <- "received an error from process: " + fmt.Sprintf("%v\n", proc.processID)
//fmt.Printf("%#v\n", s.processes[proc.node]) //fmt.Printf("%#v\n", s.processes[proc.node])
} }
@ -155,7 +159,7 @@ func getMessageToDeliver() Message {
} }
} }
func messageDeliver(node string, message Message, natsConn *nats.Conn) { func messageDeliver(proc process, message Message, natsConn *nats.Conn) {
for { for {
dataPayload, err := gobEncodePayload(message) dataPayload, err := gobEncodePayload(message)
if err != nil { if err != nil {
@ -163,9 +167,11 @@ func messageDeliver(node string, message Message, natsConn *nats.Conn) {
} }
msg := &nats.Msg{ msg := &nats.Msg{
Subject: node, Subject: string(proc.node),
Reply: "subjectReply", // Structure of the reply message are:
Data: dataPayload, // reply-<node name>-pid<pid nr>
Reply: "reply-" + string(proc.node) + "-pid" + fmt.Sprint(proc.processID),
Data: dataPayload,
} }
// The SubscribeSync used in the subscriber, will get messages that // The SubscribeSync used in the subscriber, will get messages that
@ -190,7 +196,7 @@ func messageDeliver(node string, message Message, natsConn *nats.Conn) {
// continue and resend if to reply received. // continue and resend if to reply received.
msgReply, err := subReply.NextMsg(time.Second * 10) msgReply, err := subReply.NextMsg(time.Second * 10)
if err != nil { if err != nil {
log.Printf("error: subRepl.NextMsg failed: %v\n", err) log.Printf("error: subRepl.NextMsg failed for node=%v pid=%v: %v\n", proc.node, proc.processID, err)
// did not receive a reply, continuing from top again // did not receive a reply, continuing from top again
continue continue
} }