From 6d237ffcde3504d493be74e75fbfbfd6b1ff6f04 Mon Sep 17 00:00:00 2001 From: postmannen Date: Sat, 25 Dec 2021 19:21:10 +0100 Subject: [PATCH] moved gob encoding to messageDeliverNats --- message_and_subject.go | 17 ----------------- process.go | 28 +++++++++++++++++++--------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/message_and_subject.go b/message_and_subject.go index 3c09290..58a5a2c 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -1,8 +1,6 @@ package steward import ( - "bytes" - "encoding/gob" "fmt" "log" "os" @@ -96,21 +94,6 @@ type Message struct { done chan struct{} } -// --- - -// gobEncodePayload will encode the message structure into gob -// binary format before putting it into a nats message. -func gobEncodeMessage(m Message) ([]byte, error) { - var buf bytes.Buffer - gobEnc := gob.NewEncoder(&buf) - err := gobEnc.Encode(m) - if err != nil { - return nil, fmt.Errorf("error: gob.Encode failed: %v", err) - } - - return buf.Bytes(), nil -} - // --- Subject // Node is the type definition for the node who receive or send a message. diff --git a/process.go b/process.go index d477ee9..9ad3d48 100644 --- a/process.go +++ b/process.go @@ -193,7 +193,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { // that is converted to gob format as a nats.Message. It will also // take care of checking timeouts and retries specified for the // message. -func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { +func (p process) messageDeliverNats(bufGob *bytes.Buffer, natsConn *nats.Conn, message Message) { retryAttempts := 0 const publishTimer time.Duration = 5 @@ -202,12 +202,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // The for loop will run until the message is delivered successfully, // or that retries are reached. for { - dataPayload, err := gobEncodeMessage(message) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: createDataPayload failed: %v", err) - sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) - continue - } + dataPayload := bufGob.Bytes() msg := &nats.Msg{ Subject: string(p.subject.name()), @@ -426,7 +421,8 @@ func (p process) subscribeMessages() *nats.Subscription { } // publishMessages will do the publishing of messages for one single -// process. +// process. The function should be run as a goroutine, and will run +// as long as the process it belongs to is running. func (p process) publishMessages(natsConn *nats.Conn) { for { var err error @@ -443,12 +439,26 @@ func (p process) publishMessages(natsConn *nats.Conn) { return } + // encode the message structure into gob binary format before putting + // it into a nats message. + // Prepare a gob encoder with a buffer before we start the loop + var bufGob bytes.Buffer + gobEnc := gob.NewEncoder(&bufGob) + err = gobEnc.Encode(m) + if err != nil { + er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) + sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) + continue + } + // Get the process name so we can look up the process in the // processes map, and increment the message counter. pn := processNameGet(p.subject.name(), processKindPublisher) m.ID = p.messageID - p.messageDeliverNats(natsConn, m) + p.messageDeliverNats(&bufGob, natsConn, m) + + fmt.Printf(" * DEBUG 2: %v\n", len(bufGob.Bytes())) // Signaling back to the ringbuffer that we are done with the // current message, and it can remove it from the ringbuffer.