1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

prepared encoding logic for chosing of serialization

This commit is contained in:
postmannen 2021-12-29 07:18:11 +01:00
parent ce053fc651
commit d20b4eb64c

View file

@ -478,16 +478,26 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// various configuration options chosen. // various configuration options chosen.
natsMsgHeader := nats.Header{} natsMsgHeader := nats.Header{}
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
// encode the message structure into gob binary format before putting // encode the message structure into gob binary format before putting
// it into a nats message. // it into a nats message.
// Prepare a gob encoder with a buffer before we start the loop // Prepare a gob encoder with a buffer before we start the loop
var bufGob bytes.Buffer switch p.configuration.Serialization {
gobEnc := gob.NewEncoder(&bufGob) case "cbor":
err = gobEnc.Encode(m) //
if err != nil { default:
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) var bufGob bytes.Buffer
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) gobEnc := gob.NewEncoder(&bufGob)
continue err = gobEnc.Encode(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
continue
}
natsMsgPayloadSerialized = bufGob.Bytes()
} }
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
@ -495,7 +505,11 @@ func (p process) publishMessages(natsConn *nats.Conn) {
pn := processNameGet(p.subject.name(), processKindPublisher) pn := processNameGet(p.subject.name(), processKindPublisher)
m.ID = p.messageID m.ID = p.messageID
var natsMsgPayload []byte // The compressed value of the nats message payload. The content
// can either be compressed or in it's original form depening on
// the outcome of the switch below, and if compression were chosen
// or not.
var natsMsgPayloadCompressed []byte
// Compress the data payload if selected with configuration flag. // Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when // The compression chosen is later set in the nats msg header when
@ -506,13 +520,12 @@ func (p process) publishMessages(natsConn *nats.Conn) {
if err != nil { if err != nil {
log.Printf("error: zstd write failed: %v\n", err) log.Printf("error: zstd write failed: %v\n", err)
} }
natsMsgPayload = enc.EncodeAll(bufGob.Bytes(), nil) natsMsgPayloadCompressed = enc.EncodeAll(natsMsgPayloadSerialized, nil)
fmt.Printf("* len of zstd mJson : %v\n", len(natsMsgPayload))
natsMsgHeader["cmp"] = []string{p.configuration.Compression} natsMsgHeader["cmp"] = []string{p.configuration.Compression}
case "": // no compression case "": // no compression
natsMsgPayload = bufGob.Bytes() natsMsgPayloadCompressed = natsMsgPayloadSerialized
default: // no compression default: // no compression
// Allways log the error to console. // Allways log the error to console.
@ -524,14 +537,12 @@ func (p process) publishMessages(natsConn *nats.Conn) {
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er)
}) })
natsMsgPayload = bufGob.Bytes() natsMsgPayloadCompressed = natsMsgPayloadSerialized
} }
// Create the Nats message with headers and payload, and do the // Create the Nats message with headers and payload, and do the
// sending of the message. // sending of the message.
p.messageDeliverNats(natsMsgPayload, natsMsgHeader, natsConn, m) p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
fmt.Printf(" * DEBUG 2: %v\n", len(bufGob.Bytes()))
// Signaling back to the ringbuffer that we are done with the // Signaling back to the ringbuffer that we are done with the
// current message, and it can remove it from the ringbuffer. // current message, and it can remove it from the ringbuffer.