From d20b4eb64c6e31e17d9d481d12d83f984082378e Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 29 Dec 2021 07:18:11 +0100 Subject: [PATCH] prepared encoding logic for chosing of serialization --- process.go | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/process.go b/process.go index 063c0e1..1937d1e 100644 --- a/process.go +++ b/process.go @@ -478,16 +478,26 @@ func (p process) publishMessages(natsConn *nats.Conn) { // various configuration options chosen. natsMsgHeader := nats.Header{} + // The serialized value of the nats message payload + var natsMsgPayloadSerialized []byte + // encode the message structure into gob binary format before putting // it into a nats message. // Prepare a gob encoder with a buffer before we start the loop - var bufGob bytes.Buffer - gobEnc := gob.NewEncoder(&bufGob) - err = gobEnc.Encode(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) - continue + switch p.configuration.Serialization { + case "cbor": + // + default: + var bufGob bytes.Buffer + gobEnc := gob.NewEncoder(&bufGob) + err = gobEnc.Encode(m) + if err != nil { + er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) + sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) + continue + } + + natsMsgPayloadSerialized = bufGob.Bytes() } // Get the process name so we can look up the process in the @@ -495,7 +505,11 @@ func (p process) publishMessages(natsConn *nats.Conn) { pn := processNameGet(p.subject.name(), processKindPublisher) m.ID = p.messageID - var natsMsgPayload []byte + // The compressed value of the nats message payload. The content + // can either be compressed or in it's original form depening on + // the outcome of the switch below, and if compression were chosen + // or not. + var natsMsgPayloadCompressed []byte // Compress the data payload if selected with configuration flag. // The compression chosen is later set in the nats msg header when @@ -506,13 +520,12 @@ func (p process) publishMessages(natsConn *nats.Conn) { if err != nil { log.Printf("error: zstd write failed: %v\n", err) } - natsMsgPayload = enc.EncodeAll(bufGob.Bytes(), nil) - fmt.Printf("* len of zstd mJson : %v\n", len(natsMsgPayload)) + natsMsgPayloadCompressed = enc.EncodeAll(natsMsgPayloadSerialized, nil) natsMsgHeader["cmp"] = []string{p.configuration.Compression} case "": // no compression - natsMsgPayload = bufGob.Bytes() + natsMsgPayloadCompressed = natsMsgPayloadSerialized default: // no compression // Allways log the error to console. @@ -524,14 +537,12 @@ func (p process) publishMessages(natsConn *nats.Conn) { sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(p.node), er) }) - natsMsgPayload = bufGob.Bytes() + natsMsgPayloadCompressed = natsMsgPayloadSerialized } // Create the Nats message with headers and payload, and do the // sending of the message. - p.messageDeliverNats(natsMsgPayload, natsMsgHeader, natsConn, m) - - fmt.Printf(" * DEBUG 2: %v\n", len(bufGob.Bytes())) + p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) // Signaling back to the ringbuffer that we are done with the // current message, and it can remove it from the ringbuffer.