mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 10:57:42 +00:00
moved gob encoding to messageDeliverNats
This commit is contained in:
parent
909915a77b
commit
6d237ffcde
2 changed files with 19 additions and 26 deletions
|
@ -1,8 +1,6 @@
|
||||||
package steward
|
package steward
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"encoding/gob"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
@ -96,21 +94,6 @@ type Message struct {
|
||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---
|
|
||||||
|
|
||||||
// gobEncodePayload will encode the message structure into gob
|
|
||||||
// binary format before putting it into a nats message.
|
|
||||||
func gobEncodeMessage(m Message) ([]byte, error) {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
gobEnc := gob.NewEncoder(&buf)
|
|
||||||
err := gobEnc.Encode(m)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error: gob.Encode failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.Bytes(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- Subject
|
// --- Subject
|
||||||
|
|
||||||
// Node is the type definition for the node who receive or send a message.
|
// Node is the type definition for the node who receive or send a message.
|
||||||
|
|
28
process.go
28
process.go
|
@ -193,7 +193,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) {
|
||||||
// that is converted to gob format as a nats.Message. It will also
|
// that is converted to gob format as a nats.Message. It will also
|
||||||
// take care of checking timeouts and retries specified for the
|
// take care of checking timeouts and retries specified for the
|
||||||
// message.
|
// message.
|
||||||
func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
func (p process) messageDeliverNats(bufGob *bytes.Buffer, natsConn *nats.Conn, message Message) {
|
||||||
retryAttempts := 0
|
retryAttempts := 0
|
||||||
|
|
||||||
const publishTimer time.Duration = 5
|
const publishTimer time.Duration = 5
|
||||||
|
@ -202,12 +202,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
|
||||||
// The for loop will run until the message is delivered successfully,
|
// The for loop will run until the message is delivered successfully,
|
||||||
// or that retries are reached.
|
// or that retries are reached.
|
||||||
for {
|
for {
|
||||||
dataPayload, err := gobEncodeMessage(message)
|
dataPayload := bufGob.Bytes()
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: messageDeliverNats: createDataPayload failed: %v", err)
|
|
||||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
msg := &nats.Msg{
|
msg := &nats.Msg{
|
||||||
Subject: string(p.subject.name()),
|
Subject: string(p.subject.name()),
|
||||||
|
@ -426,7 +421,8 @@ func (p process) subscribeMessages() *nats.Subscription {
|
||||||
}
|
}
|
||||||
|
|
||||||
// publishMessages will do the publishing of messages for one single
|
// publishMessages will do the publishing of messages for one single
|
||||||
// process.
|
// process. The function should be run as a goroutine, and will run
|
||||||
|
// as long as the process it belongs to is running.
|
||||||
func (p process) publishMessages(natsConn *nats.Conn) {
|
func (p process) publishMessages(natsConn *nats.Conn) {
|
||||||
for {
|
for {
|
||||||
var err error
|
var err error
|
||||||
|
@ -443,12 +439,26 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// encode the message structure into gob binary format before putting
|
||||||
|
// it into a nats message.
|
||||||
|
// Prepare a gob encoder with a buffer before we start the loop
|
||||||
|
var bufGob bytes.Buffer
|
||||||
|
gobEnc := gob.NewEncoder(&bufGob)
|
||||||
|
err = gobEnc.Encode(m)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
|
||||||
|
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// Get the process name so we can look up the process in the
|
// Get the process name so we can look up the process in the
|
||||||
// processes map, and increment the message counter.
|
// processes map, and increment the message counter.
|
||||||
pn := processNameGet(p.subject.name(), processKindPublisher)
|
pn := processNameGet(p.subject.name(), processKindPublisher)
|
||||||
m.ID = p.messageID
|
m.ID = p.messageID
|
||||||
|
|
||||||
p.messageDeliverNats(natsConn, m)
|
p.messageDeliverNats(&bufGob, natsConn, m)
|
||||||
|
|
||||||
|
fmt.Printf(" * DEBUG 2: %v\n", len(bufGob.Bytes()))
|
||||||
|
|
||||||
// Signaling back to the ringbuffer that we are done with the
|
// Signaling back to the ringbuffer that we are done with the
|
||||||
// current message, and it can remove it from the ringbuffer.
|
// current message, and it can remove it from the ringbuffer.
|
||||||
|
|
Loading…
Add table
Reference in a new issue