From 10c468f6b757391826be8f73245ea7768c2ea017 Mon Sep 17 00:00:00 2001
From: postmannen <postmannen@gmail.com>
Date: Wed, 27 Nov 2024 11:30:43 +0100
Subject: [PATCH] added functions for handling cbor serializing and zstd
 compression, and swapped out json marshaling of jetstream message data with
 cbor and zstd

---
 message_readers.go | 14 ++++-----
 server.go          | 78 +++++++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 84 insertions(+), 8 deletions(-)

diff --git a/message_readers.go b/message_readers.go
index 9f9df95..8a8c888 100644
--- a/message_readers.go
+++ b/message_readers.go
@@ -136,19 +136,20 @@ func (s *server) jetstreamPublish() {
 	// Publish messages.
 	for {
 		select {
-		case jsMSG := <-s.jetstreamPublishCh:
-			b, err := json.Marshal(jsMSG)
+		case msg := <-s.jetstreamPublishCh:
+
+			b, err := s.messageSerializeAndCompress(msg)
 			if err != nil {
 				log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err)
 			}
 
-			subject := string(fmt.Sprintf("NODES.%v", jsMSG.JetstreamToNode))
+			subject := string(fmt.Sprintf("NODES.%v", msg.JetstreamToNode))
 			_, err = js.Publish(s.ctx, subject, b)
 			if err != nil {
 				log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err)
 			}
 
-			fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, jsMSG)
+			fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, msg)
 		case <-s.ctx.Done():
 		}
 	}
@@ -199,10 +200,9 @@ func (s *server) jetstreamConsume() {
 
 		msg.Ack()
 
-		m := Message{}
-		err := json.Unmarshal(msg.Data(), &m)
+		m, err := s.messageDeserializeAndUncompress(msg)
 		if err != nil {
-			er := fmt.Errorf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v", err)
+			er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err)
 			s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
 			return
 		}
diff --git a/server.go b/server.go
index 437149e..e32fb10 100644
--- a/server.go
+++ b/server.go
@@ -13,8 +13,11 @@ import (
 	"sync"
 	"time"
 
+	"github.com/fxamacker/cbor/v2"
 	"github.com/jinzhu/copier"
+	"github.com/klauspost/compress/zstd"
 	"github.com/nats-io/nats.go"
+	"github.com/nats-io/nats.go/jetstream"
 	"github.com/prometheus/client_golang/prometheus"
 )
 
@@ -74,7 +77,8 @@ type server struct {
 	// message ID
 	messageID messageID
 	// audit logging
-	auditLogCh chan []subjectAndMessage
+	auditLogCh  chan []subjectAndMessage
+	zstdEncoder *zstd.Encoder
 }
 
 type messageID struct {
@@ -212,6 +216,18 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
 	centralAuth := newCentralAuth(configuration, errorKernel)
 	//}
 
+	zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
+	if err != nil {
+		log.Fatalf("error: zstd new encoder failed: %v", err)
+	}
+
+	defer func() {
+		go func() {
+			<-ctx.Done()
+			zstdEncoder.Close()
+		}()
+	}()
+
 	s := server{
 		ctx:                ctx,
 		cancel:             cancel,
@@ -229,6 +245,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
 		helloRegister:      newHelloRegister(),
 		centralAuth:        centralAuth,
 		auditLogCh:         make(chan []subjectAndMessage),
+		zstdEncoder:        zstdEncoder,
 	}
 
 	s.processes = newProcesses(ctx, &s)
@@ -629,3 +646,62 @@ func (s *server) exposeDataFolder() {
 	os.Exit(1)
 
 }
+
+// messageSerializeAndCompress will serialize and compress the Message, and
+// return the result as a []byte.
+func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) {
+
+	// encode the message structure into cbor
+	bSerialized, err := cbor.Marshal(msg)
+	if err != nil {
+		er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
+		s.errorKernel.logDebug(er)
+		return nil, er
+	}
+
+	// Compress the data payload if selected with configuration flag.
+	// The compression chosen is later set in the nats msg header when
+	// calling p.messageDeliverNats below.
+
+	bCompressed := s.zstdEncoder.EncodeAll(bSerialized, nil)
+
+	return bCompressed, nil
+}
+
+// messageDeserializeAndUncompress will deserialize the ctrl message
+func (s *server) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, error) {
+
+	// Variable to hold a copy of the message data.
+	msgData := msg.Data()
+
+	// If debugging is enabled, print the source node name of the nats messages received.
+	headerFromNode := msg.Headers().Get("fromNode")
+	if headerFromNode != "" {
+		er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
+		s.errorKernel.logDebug(er)
+	}
+
+	zr, err := zstd.NewReader(nil)
+	if err != nil {
+		er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err)
+		return Message{}, er
+	}
+	msgData, err = zr.DecodeAll(msgData, nil)
+	if err != nil {
+		er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err)
+		zr.Close()
+		return Message{}, er
+	}
+
+	zr.Close()
+
+	message := Message{}
+
+	err = cbor.Unmarshal(msgData, &message)
+	if err != nil {
+		er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", msg.Subject(), err)
+		return Message{}, er
+	}
+
+	return message, nil
+}