1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 06:46:48 +00:00

all messaging are using zstd for compression, and cbor for serializing

This commit is contained in:
postmannen 2024-11-27 12:54:06 +01:00
parent 10c468f6b7
commit 84a731f18a
3 changed files with 25 additions and 196 deletions

View file

@ -200,7 +200,7 @@ func (s *server) jetstreamConsume() {
msg.Ack()
m, err := s.messageDeserializeAndUncompress(msg)
m, err := s.messageDeserializeAndUncompress(msg.Data())
if err != nil {
er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError)

View file

@ -1,19 +1,14 @@
package ctrl
import (
"bytes"
"compress/gzip"
"context"
"crypto/ed25519"
"encoding/gob"
"errors"
"fmt"
"io"
"log"
"os"
"sync"
"time"
"github.com/fxamacker/cbor/v2"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
@ -462,90 +457,11 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
p.errorKernel.logDebug(er)
}
// If compression is used, decompress it to get the gob data. If
// compression is not used it is the gob encoded data we already
// got in msgData so we do nothing with it.
if val, ok := msg.Header["cmp"]; ok {
switch val[0] {
case "z":
zr, err := zstd.NewReader(nil)
if err != nil {
er := fmt.Errorf("error: zstd NewReader failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
return
}
msgData, err = zr.DecodeAll(msg.Data, nil)
if err != nil {
er := fmt.Errorf("error: zstd decoding failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
zr.Close()
return
}
zr.Close()
case "g":
r := bytes.NewReader(msgData)
gr, err := gzip.NewReader(r)
if err != nil {
er := fmt.Errorf("error: gzip NewReader failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logError)
return
}
b, err := io.ReadAll(gr)
if err != nil {
er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
return
}
gr.Close()
msgData = b
}
}
message := Message{}
// TODO: Jetstream
// Use CBOR and Compression for all messages, and drop the use of the header fields.
// Check if serialization is specified.
// Will default to gob serialization if nothing or non existing value is specified.
if val, ok := msg.Header["serial"]; ok {
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
switch val[0] {
case "cbor":
err := cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.errorKernel.errSend(p, message, er, logError)
return
}
default: // Deaults to gob if no match was found.
r := bytes.NewReader(msgData)
gobDec := gob.NewDecoder(r)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.errorKernel.errSend(p, message, er, logError)
return
}
}
} else {
// Default to gob if serialization flag was not specified.
r := bytes.NewReader(msgData)
gobDec := gob.NewDecoder(r)
err := gobDec.Decode(&message)
if err != nil {
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
p.errorKernel.errSend(p, message, er, logError)
return
}
message, err := p.server.messageDeserializeAndUncompress(msgData)
if err != nil {
er := fmt.Errorf("error: messageSubscriberHandler: deserialize and uncompress failed: %v", err)
// p.errorKernel.logDebug(er)
log.Fatalf("%v\n", er)
}
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
@ -800,7 +716,6 @@ func (p process) subscribeMessages() *nats.Subscription {
// process. The function should be run as a goroutine, and will run
// as long as the process it belongs to is running.
func (p process) publishMessages(natsConn *nats.Conn) {
var once sync.Once
var zEnc *zstd.Encoder
// Prepare a zstd encoder if enabled. By enabling it here before
@ -862,7 +777,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
m.ArgSignature = p.addMethodArgSignature(m)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
go p.publishAMessage(m, zEnc, &once, natsConn)
go p.publishAMessage(m, natsConn)
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
@ -879,108 +794,26 @@ func (p process) addMethodArgSignature(m Message) []byte {
return sign
}
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) {
func (p process) publishAMessage(m Message, natsConn *nats.Conn) {
// Create the initial header, and set values below depending on the
// various configuration options chosen.
natsMsgHeader := make(nats.Header)
natsMsgHeader["fromNode"] = []string{string(p.node)}
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
// encode the message structure into gob binary format before putting
// it into a nats message.
// Prepare a gob encoder with a buffer before we start the loop
switch p.configuration.Serialization {
case "cbor":
b, err := cbor.Marshal(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.errorKernel.logDebug(er)
return
}
natsMsgPayloadSerialized = b
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
default:
var bufGob bytes.Buffer
gobEnc := gob.NewEncoder(&bufGob)
err := gobEnc.Encode(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
p.errorKernel.logDebug(er)
return
}
natsMsgPayloadSerialized = bufGob.Bytes()
natsMsgHeader["serial"] = []string{"gob"}
}
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)
// The compressed value of the nats message payload. The content
// can either be compressed or in it's original form depening on
// the outcome of the switch below, and if compression were chosen
// or not.
var natsMsgPayloadCompressed []byte
// Compress the data payload if selected with configuration flag.
// The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
switch p.configuration.Compression {
case "z": // zstd
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
// p.zEncMutex.Lock()
// zEnc.Reset(nil)
// p.zEncMutex.Unlock()
case "g": // gzip
var buf bytes.Buffer
func() {
gzipW := gzip.NewWriter(&buf)
defer gzipW.Close()
defer gzipW.Flush()
_, err := gzipW.Write(natsMsgPayloadSerialized)
if err != nil {
er := fmt.Errorf("error: failed to write gzip: %v", err)
p.errorKernel.logDebug(er)
return
}
}()
natsMsgPayloadCompressed = buf.Bytes()
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
case "": // no compression
natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"}
default: // no compression
// Allways log the error to console.
er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression")
b, err := p.server.messageSerializeAndCompress(m)
if err != nil {
er := fmt.Errorf("error: publishAMessage: serialize and compress failed: %v", err)
p.errorKernel.logDebug(er)
// We only wan't to send the error message to errorCentral once.
once.Do(func() {
p.errorKernel.logDebug(er)
})
// No compression, so we just assign the value of the serialized
// data directly to the variable used with messageDeliverNats.
natsMsgPayloadCompressed = natsMsgPayloadSerialized
natsMsgHeader["cmp"] = []string{"none"}
return
}
// Create the Nats message with headers and payload, and do the
// sending of the message.
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
p.messageDeliverNats(b, natsMsgHeader, natsConn, m)
// Get the process name so we can look up the process in the
// processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisher)
// Increment the counter for the next message to be sent.
p.messageID++

View file

@ -17,7 +17,6 @@ import (
"github.com/jinzhu/copier"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
"github.com/prometheus/client_golang/prometheus"
)
@ -669,17 +668,14 @@ func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) {
}
// messageDeserializeAndUncompress will deserialize the ctrl message
func (s *server) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, error) {
func (s *server) messageDeserializeAndUncompress(msgData []byte) (Message, error) {
// Variable to hold a copy of the message data.
msgData := msg.Data()
// If debugging is enabled, print the source node name of the nats messages received.
headerFromNode := msg.Headers().Get("fromNode")
if headerFromNode != "" {
er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
s.errorKernel.logDebug(er)
}
// // If debugging is enabled, print the source node name of the nats messages received.
// headerFromNode := msg.Headers().Get("fromNode")
// if headerFromNode != "" {
// er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
// s.errorKernel.logDebug(er)
// }
zr, err := zstd.NewReader(nil)
if err != nil {
@ -699,7 +695,7 @@ func (s *server) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, er
err = cbor.Unmarshal(msgData, &message)
if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", msg.Subject(), err)
er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, error: %v", err)
return Message{}, er
}