1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

added serializing and compression functions for jetstream

This commit is contained in:
postmannen 2024-11-22 20:36:55 +01:00
parent 20172a3806
commit 8065eb248b
6 changed files with 72 additions and 76 deletions

View file

@ -105,7 +105,7 @@ type StartProcesses struct {
EnableAclUpdates bool `comment:"Enable the updates of acl's"` EnableAclUpdates bool `comment:"Enable the updates of acl's"`
IsCentralErrorLogger bool `comment:"Start the central error logger."` IsCentralErrorLogger bool `comment:"Start the central error logger"`
StartSubHello bool `comment:"Start subscriber for hello messages"` StartSubHello bool `comment:"Start subscriber for hello messages"`
StartSubFileAppend bool `comment:"Start subscriber for text logging"` StartSubFileAppend bool `comment:"Start subscriber for text logging"`
StartSubFile bool `comment:"Start subscriber for writing to file"` StartSubFile bool `comment:"Start subscriber for writing to file"`

View file

@ -86,7 +86,7 @@ type Message struct {
Schedule []int `json:"schedule" yaml:"schedule"` Schedule []int `json:"schedule" yaml:"schedule"`
// Is to be used with the stream subject to tell what nodes // Is to be used with the stream subject to tell what nodes
// the the message is for. // the the message is for.
JetstreamToNode string JetstreamToNode string `json:"jetstreamToNode" yaml:"jetstreamToNode"`
} }
// --- Subject // --- Subject

View file

@ -8,11 +8,8 @@ import (
"github.com/nats-io/nats.go/jetstream" "github.com/nats-io/nats.go/jetstream"
) )
// messageSubscriberHandlerJetstream will deserialize the message when a new message is // messageDeserializeAndUncompress will deserialize the ctrl message
// received, check the MessageType field in the message to decide what func (p *process) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, error) {
// kind of message it is and then it will check how to handle that message type,
// and then call the correct method handler for it.
func (p process) messageSubscriberHandlerJetstream(thisNode string, msg jetstream.Msg, subject string) {
// Variable to hold a copy of the message data. // Variable to hold a copy of the message data.
msgData := msg.Data() msgData := msg.Data()
@ -20,22 +17,20 @@ func (p process) messageSubscriberHandlerJetstream(thisNode string, msg jetstrea
// If debugging is enabled, print the source node name of the nats messages received. // If debugging is enabled, print the source node name of the nats messages received.
headerFromNode := msg.Headers().Get("fromNode") headerFromNode := msg.Headers().Get("fromNode")
if headerFromNode != "" { if headerFromNode != "" {
er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, subject) er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject())
p.errorKernel.logDebug(er) p.errorKernel.logDebug(er)
} }
zr, err := zstd.NewReader(nil) zr, err := zstd.NewReader(nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err) er := fmt.Errorf("error: subscriberHandlerJetstream: zstd NewReader failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning) return Message{}, er
return
} }
msgData, err = zr.DecodeAll(msgData, nil) msgData, err = zr.DecodeAll(msgData, nil)
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err) er := fmt.Errorf("error: subscriberHandlerJetstream: zstd decoding failed: %v", err)
p.errorKernel.errSend(p, Message{}, er, logWarning)
zr.Close() zr.Close()
return return Message{}, er
} }
zr.Close() zr.Close()
@ -44,28 +39,30 @@ func (p process) messageSubscriberHandlerJetstream(thisNode string, msg jetstrea
err = cbor.Unmarshal(msgData, &message) err = cbor.Unmarshal(msgData, &message)
if err != nil { if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", subject, err) er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", msg.Subject(), err)
p.errorKernel.errSend(p, message, er, logError) return Message{}, er
return
} }
er := fmt.Errorf("subscriberHandlerJetstream: received message: %v, from: %v, id:%v", message.Method, message.FromNode, message.ID) return message, nil
p.errorKernel.logDebug(er) }
// When spawning sub processes we can directly assign handlers to the process upon
// creation. We here check if a handler is already assigned, and if it is nil, we // messageSerializeAndCompress will serialize and compress the Message, and
// lookup and find the correct handler to use if available. // return the result as a []byte.
if p.handler == nil { func (p *process) messageSerializeAndCompress(msg Message) ([]byte, error) {
// Look up the method handler for the specified method.
mh, ok := p.methodsAvailable.CheckIfExists(message.Method) // encode the message structure into cbor
p.handler = mh bSerialized, err := cbor.Marshal(msg)
if !ok { if err != nil {
er := fmt.Errorf("error: subscriberHandlerJetstream: no such method type: %v", p.subject.Method) er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.errorKernel.errSend(p, message, er, logWarning) p.errorKernel.logDebug(er)
return return nil, er
} }
}
// Compress the data payload if selected with configuration flag.
_ = p.callHandler(message, thisNode) // The compression chosen is later set in the nats msg header when
// calling p.messageDeliverNats below.
msg.Ack()
bCompressed := p.server.zstdEncoder.EncodeAll(bSerialized, nil)
return bCompressed, nil
} }

View file

@ -6,7 +6,6 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"os"
"time" "time"
"github.com/fxamacker/cbor/v2" "github.com/fxamacker/cbor/v2"
@ -125,6 +124,8 @@ type process struct {
metrics *metrics metrics *metrics
// jetstream // jetstream
js jetstream.JetStream js jetstream.JetStream
// zstd encoder
zstdEncoder *zstd.Encoder
} }
// prepareNewProcess will set the the provided values and the default // prepareNewProcess will set the the provided values and the default
@ -166,6 +167,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, stream str
errorKernel: server.errorKernel, errorKernel: server.errorKernel,
metrics: server.metrics, metrics: server.metrics,
js: js, js: js,
zstdEncoder: server.zstdEncoder,
} }
// We use the name of the subject to identify a unique process. // We use the name of the subject to identify a unique process.
@ -750,18 +752,6 @@ func (p process) subscribeMessagesNats() *nats.Subscription {
// as long as the process it belongs to is running. // as long as the process it belongs to is running.
func (p process) publishMessagesNats(natsConn *nats.Conn) { func (p process) publishMessagesNats(natsConn *nats.Conn) {
var zEnc *zstd.Encoder
// Prepare a zstd encoder so we can reuse the zstd encoder for all messages.
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
er := fmt.Errorf("error: zstd new encoder failed: %v", err)
p.errorKernel.logError(er)
os.Exit(1)
}
zEnc = enc
defer zEnc.Close()
// Adding a timer that will be used for when to remove the sub process // Adding a timer that will be used for when to remove the sub process
// publisher. The timer is reset each time a message is published with // publisher. The timer is reset each time a message is published with
// the process, so the sub process publisher will not be removed until // the process, so the sub process publisher will not be removed until
@ -804,7 +794,7 @@ func (p process) publishMessagesNats(natsConn *nats.Conn) {
m.ArgSignature = p.addMethodArgSignature(m) m.ArgSignature = p.addMethodArgSignature(m)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
go p.publishAMessageNats(m, zEnc, natsConn) go p.publishAMessageNats(m, natsConn)
case <-p.ctx.Done(): case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.processName) er := fmt.Errorf("info: canceling publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
@ -821,38 +811,24 @@ func (p process) addMethodArgSignature(m Message) []byte {
return sign return sign
} }
func (p process) publishAMessageNats(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) { func (p process) publishAMessageNats(m Message, natsConn *nats.Conn) {
// Create the initial header, and set values below depending on the // Create the initial header, and set values below depending on the
// various configuration options chosen. // various configuration options chosen.
natsMsgHeader := make(nats.Header) natsMsgHeader := make(nats.Header)
natsMsgHeader["fromNode"] = []string{string(p.node)} natsMsgHeader["fromNode"] = []string{string(p.node)}
// The serialized value of the nats message payload
var natsMsgPayloadSerialized []byte
// encode the message structure into cbor
b, err := cbor.Marshal(m)
if err != nil {
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
p.errorKernel.logDebug(er)
return
}
natsMsgPayloadSerialized = b
// Get the process name so we can look up the process in the // Get the process name so we can look up the process in the
// processes map, and increment the message counter. // processes map, and increment the message counter.
pn := processNameGet(p.subject.name(), processKindPublisherNats) pn := processNameGet(p.subject.name(), processKindPublisherNats)
// Compress the data payload if selected with configuration flag. serCmp, err := p.messageSerializeAndCompress(m)
// The compression chosen is later set in the nats msg header when if err != nil {
// calling p.messageDeliverNats below. log.Fatalf("messageSerializeAndCompress: error: %v\n", err)
}
natsMsgPayloadCompressed := zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
// Create the Nats message with headers and payload, and do the // Create the Nats message with headers and payload, and do the
// sending of the message. // sending of the message.
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) p.messageDeliverNats(serCmp, natsMsgHeader, natsConn, m)
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
p.messageID++ p.messageID++

View file

@ -2,7 +2,6 @@ package ctrl
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"log" "log"
"strings" "strings"
@ -367,7 +366,7 @@ func (p *processes) Start(proc process) {
// TODO: // TODO:
select { select {
case msg := <-proc.jetstreamOut: case msg := <-proc.jetstreamOut:
b, err := json.Marshal(msg) b, err := proc.messageSerializeAndCompress(msg)
if err != nil { if err != nil {
log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err) log.Fatalf("error: pfJetstreamPublishers: js failed to marshal message: %v\n", err)
} }
@ -408,10 +407,15 @@ func (p *processes) Start(proc process) {
// Check for more subjects via flags to listen to, and if defined prefix all // Check for more subjects via flags to listen to, and if defined prefix all
// the values with "nodes." // the values with "nodes."
filterSubjectValues := []string{fmt.Sprintf("nodes.%v", proc.server.nodeName), "nodes.all"} filterSubjectValues := []string{
fmt.Sprintf("nodes.%v", proc.server.nodeName),
"nodes.all",
}
// Check if there are more to consume defined in flags/env.
if proc.configuration.JetstreamsConsume != "" { if proc.configuration.JetstreamsConsume != "" {
filterSubjectValues = strings.Split(proc.configuration.JetstreamsConsume, ",") splitValues := strings.Split(proc.configuration.JetstreamsConsume, ",")
for i, v := range filterSubjectValues { for i, v := range splitValues {
filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v) filterSubjectValues[i] = fmt.Sprintf("nodes.%v", v)
} }
} }
@ -427,7 +431,7 @@ func (p *processes) Start(proc process) {
cctx, err := consumer.Consume(func(msg jetstream.Msg) { cctx, err := consumer.Consume(func(msg jetstream.Msg) {
stewardMessage := Message{} stewardMessage := Message{}
err := json.Unmarshal(msg.Data(), &stewardMessage) stewardMessage, err := proc.messageDeserializeAndUncompress(msg)
if err != nil { if err != nil {
log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err) log.Fatalf("error: pfJetstreamConsumers: json.Unmarshal failed: %v\n", err)
} }

View file

@ -14,6 +14,7 @@ import (
"time" "time"
"github.com/jinzhu/copier" "github.com/jinzhu/copier"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@ -75,6 +76,8 @@ type server struct {
messageID messageID messageID messageID
// audit logging // audit logging
auditLogCh chan []subjectAndMessage auditLogCh chan []subjectAndMessage
// zstd encoder
zstdEncoder *zstd.Encoder
} }
type messageID struct { type messageID struct {
@ -212,6 +215,21 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
centralAuth := newCentralAuth(configuration, errorKernel) centralAuth := newCentralAuth(configuration, errorKernel)
//} //}
// Prepare the zstd encoder
// Prepare the zstd encoder to put into processInitial
var zEnc *zstd.Encoder
// Prepare a zstd encoder so we can reuse the zstd encoder for all messages.
zstdEncoder, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
log.Fatalf("error: zstd new encoder failed: %v", err)
}
go func() {
<-ctx.Done()
zEnc.Close()
}()
s := server{ s := server{
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
@ -229,6 +247,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
helloRegister: newHelloRegister(), helloRegister: newHelloRegister(),
centralAuth: centralAuth, centralAuth: centralAuth,
auditLogCh: make(chan []subjectAndMessage), auditLogCh: make(chan []subjectAndMessage),
zstdEncoder: zstdEncoder,
} }
s.processes = newProcesses(ctx, &s) s.processes = newProcesses(ctx, &s)
@ -387,14 +406,14 @@ func (s *server) startAuditLog(ctx context.Context) {
js, err := json.Marshal(msgForPermStore) js, err := json.Marshal(msgForPermStore)
if err != nil { if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) er := fmt.Errorf("error: startAuditLog: fillBuffer: json marshaling: %v", err)
s.errorKernel.errSend(s.processInitial, Message{}, er, logError) s.errorKernel.errSend(s.processInitial, Message{}, er, logError)
} }
d := time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n" d := time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
_, err = f.WriteString(d) _, err = f.WriteString(d)
if err != nil { if err != nil {
log.Printf("error:failed to write entry: %v\n", err) log.Printf("error: startAuditLog:failed to write entry: %v\n", err)
} }
} }
case <-ctx.Done(): case <-ctx.Done():