1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

added functions for handling cbor serializing and zstd compression, and swapped out json marshaling of jetstream message data with cbor and zstd

This commit is contained in:
postmannen 2024-11-27 11:30:43 +01:00
parent 5fee84c18a
commit 10c468f6b7
2 changed files with 84 additions and 8 deletions

View file

@ -136,19 +136,20 @@ func (s *server) jetstreamPublish() {
// Publish messages.
for {
select {
case jsMSG := <-s.jetstreamPublishCh:
b, err := json.Marshal(jsMSG)
case msg := <-s.jetstreamPublishCh:
b, err := s.messageSerializeAndCompress(msg)
if err != nil {
log.Fatalf("error: jetstreamPublish: marshal of message failed: %v\n", err)
}
subject := string(fmt.Sprintf("NODES.%v", jsMSG.JetstreamToNode))
subject := string(fmt.Sprintf("NODES.%v", msg.JetstreamToNode))
_, err = js.Publish(s.ctx, subject, b)
if err != nil {
log.Fatalf("error: jetstreamPublish: publish failed: %v\n", err)
}
fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, jsMSG)
fmt.Printf("Published jetstream on subject: %q, message: %v\n", subject, msg)
case <-s.ctx.Done():
}
}
@ -199,10 +200,9 @@ func (s *server) jetstreamConsume() {
msg.Ack()
m := Message{}
err := json.Unmarshal(msg.Data(), &m)
m, err := s.messageDeserializeAndUncompress(msg)
if err != nil {
er := fmt.Errorf("error: jetstreamConsume: CreateOrUpdateConsumer failed: %v", err)
er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
return
}

View file

@ -13,8 +13,11 @@ import (
"sync"
"time"
"github.com/fxamacker/cbor/v2"
"github.com/jinzhu/copier"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus"
)
@ -74,7 +77,8 @@ type server struct {
// message ID
messageID messageID
// audit logging
auditLogCh chan []subjectAndMessage
auditLogCh chan []subjectAndMessage
zstdEncoder *zstd.Encoder
}
type messageID struct {
@ -212,6 +216,18 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
centralAuth := newCentralAuth(configuration, errorKernel)
//}
zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
log.Fatalf("error: zstd new encoder failed: %v", err)
}
defer func() {
go func() {
<-ctx.Done()
zstdEncoder.Close()
}()
}()
s := server{
ctx: ctx,
cancel: cancel,
@ -229,6 +245,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
helloRegister: newHelloRegister(),
centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage),
zstdEncoder: zstdEncoder,
}
s.processes = newProcesses(ctx, &s)
@ -629,3 +646,62 @@ func (s *server) exposeDataFolder() {
os.Exit(1)
}
// messageSerializeAndCompress will serialize and compress the Message, and
// return the result as a []byte.
func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) {
// encode the message structure into cbor
bSerialized, err := cbor.Marshal(msg)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
s.errorKernel.logDebug(er)
return nil, er
}
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
bCompressed := s.zstdEncoder.EncodeAll(bSerialized, nil)
return bCompressed, nil
}
// messageDeserializeAndUncompress will deserialize the ctrl message
func (s *server) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, error) {
// Variable to hold a copy of the message data.
msgData := msg.Data()
// If debugging is enabled, print the source node name of the nats messages received.
headerFromNode := msg.Headers().Get("fromNode")
if headerFromNode != "" {
er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
s.errorKernel.logDebug(er)
}
zr, err := zstd.NewReader(nil)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err)
return Message{}, er
}
msgData, err = zr.DecodeAll(msgData, nil)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err)
zr.Close()
return Message{}, er
}
zr.Close()
message := Message{}
err = cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", msg.Subject(), err)
return Message{}, er
}
return message, nil
}