diff --git a/message_readers.go b/message_readers.go index 8a8c888..241ac4a 100644 --- a/message_readers.go +++ b/message_readers.go @@ -200,7 +200,7 @@ func (s *server) jetstreamConsume() { msg.Ack() - m, err := s.messageDeserializeAndUncompress(msg) + m, err := s.messageDeserializeAndUncompress(msg.Data()) if err != nil { er := fmt.Errorf("jetstreamConsume: deserialize and uncompress failed: %v", err) s.errorKernel.errSend(s.processInitial, Message{}, er, logError) diff --git a/process.go b/process.go index 7dd04c9..6f26689 100644 --- a/process.go +++ b/process.go @@ -1,19 +1,14 @@ package ctrl import ( - "bytes" - "compress/gzip" "context" "crypto/ed25519" - "encoding/gob" "errors" "fmt" - "io" + "log" "os" - "sync" "time" - "github.com/fxamacker/cbor/v2" "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" @@ -462,90 +457,11 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, p.errorKernel.logDebug(er) } - // If compression is used, decompress it to get the gob data. If - // compression is not used it is the gob encoded data we already - // got in msgData so we do nothing with it. - if val, ok := msg.Header["cmp"]; ok { - switch val[0] { - case "z": - zr, err := zstd.NewReader(nil) - if err != nil { - er := fmt.Errorf("error: zstd NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - return - } - msgData, err = zr.DecodeAll(msg.Data, nil) - if err != nil { - er := fmt.Errorf("error: zstd decoding failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - zr.Close() - return - } - - zr.Close() - - case "g": - r := bytes.NewReader(msgData) - gr, err := gzip.NewReader(r) - if err != nil { - er := fmt.Errorf("error: gzip NewReader failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logError) - return - } - - b, err := io.ReadAll(gr) - if err != nil { - er := fmt.Errorf("error: gzip ReadAll failed: %v", err) - p.errorKernel.errSend(p, Message{}, er, logWarning) - return - } - - gr.Close() - - msgData = b - } - } - - message := Message{} - - // TODO: Jetstream - // Use CBOR and Compression for all messages, and drop the use of the header fields. - - // Check if serialization is specified. - // Will default to gob serialization if nothing or non existing value is specified. - if val, ok := msg.Header["serial"]; ok { - // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) - switch val[0] { - case "cbor": - err := cbor.Unmarshal(msgData, &message) - if err != nil { - er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } - default: // Deaults to gob if no match was found. - r := bytes.NewReader(msgData) - gobDec := gob.NewDecoder(r) - - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } - } - - } else { - // Default to gob if serialization flag was not specified. - r := bytes.NewReader(msgData) - gobDec := gob.NewDecoder(r) - - err := gobDec.Decode(&message) - if err != nil { - er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err) - p.errorKernel.errSend(p, message, er, logError) - return - } + message, err := p.server.messageDeserializeAndUncompress(msgData) + if err != nil { + er := fmt.Errorf("error: messageSubscriberHandler: deserialize and uncompress failed: %v", err) + // p.errorKernel.logDebug(er) + log.Fatalf("%v\n", er) } // Check if it is an ACK or NACK message, and do the appropriate action accordingly. @@ -800,7 +716,6 @@ func (p process) subscribeMessages() *nats.Subscription { // process. The function should be run as a goroutine, and will run // as long as the process it belongs to is running. func (p process) publishMessages(natsConn *nats.Conn) { - var once sync.Once var zEnc *zstd.Encoder // Prepare a zstd encoder if enabled. By enabling it here before @@ -862,7 +777,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { m.ArgSignature = p.addMethodArgSignature(m) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) - go p.publishAMessage(m, zEnc, &once, natsConn) + go p.publishAMessage(m, natsConn) case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.processName) //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) @@ -879,108 +794,26 @@ func (p process) addMethodArgSignature(m Message) []byte { return sign } -func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) { +func (p process) publishAMessage(m Message, natsConn *nats.Conn) { // Create the initial header, and set values below depending on the // various configuration options chosen. natsMsgHeader := make(nats.Header) natsMsgHeader["fromNode"] = []string{string(p.node)} - // The serialized value of the nats message payload - var natsMsgPayloadSerialized []byte - - // encode the message structure into gob binary format before putting - // it into a nats message. - // Prepare a gob encoder with a buffer before we start the loop - switch p.configuration.Serialization { - case "cbor": - b, err := cbor.Marshal(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err) - p.errorKernel.logDebug(er) - return - } - - natsMsgPayloadSerialized = b - natsMsgHeader["serial"] = []string{p.configuration.Serialization} - - default: - var bufGob bytes.Buffer - gobEnc := gob.NewEncoder(&bufGob) - err := gobEnc.Encode(m) - if err != nil { - er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err) - p.errorKernel.logDebug(er) - return - } - - natsMsgPayloadSerialized = bufGob.Bytes() - natsMsgHeader["serial"] = []string{"gob"} - } - - // Get the process name so we can look up the process in the - // processes map, and increment the message counter. - pn := processNameGet(p.subject.name(), processKindPublisher) - - // The compressed value of the nats message payload. The content - // can either be compressed or in it's original form depening on - // the outcome of the switch below, and if compression were chosen - // or not. - var natsMsgPayloadCompressed []byte - - // Compress the data payload if selected with configuration flag. - // The compression chosen is later set in the nats msg header when - // calling p.messageDeliverNats below. - switch p.configuration.Compression { - case "z": // zstd - natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil) - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - - // p.zEncMutex.Lock() - // zEnc.Reset(nil) - // p.zEncMutex.Unlock() - - case "g": // gzip - var buf bytes.Buffer - func() { - gzipW := gzip.NewWriter(&buf) - defer gzipW.Close() - defer gzipW.Flush() - _, err := gzipW.Write(natsMsgPayloadSerialized) - if err != nil { - er := fmt.Errorf("error: failed to write gzip: %v", err) - p.errorKernel.logDebug(er) - return - } - - }() - - natsMsgPayloadCompressed = buf.Bytes() - natsMsgHeader["cmp"] = []string{p.configuration.Compression} - - case "": // no compression - natsMsgPayloadCompressed = natsMsgPayloadSerialized - natsMsgHeader["cmp"] = []string{"none"} - - default: // no compression - // Allways log the error to console. - er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression") + b, err := p.server.messageSerializeAndCompress(m) + if err != nil { + er := fmt.Errorf("error: publishAMessage: serialize and compress failed: %v", err) p.errorKernel.logDebug(er) - - // We only wan't to send the error message to errorCentral once. - once.Do(func() { - p.errorKernel.logDebug(er) - }) - - // No compression, so we just assign the value of the serialized - // data directly to the variable used with messageDeliverNats. - natsMsgPayloadCompressed = natsMsgPayloadSerialized - natsMsgHeader["cmp"] = []string{"none"} + return } // Create the Nats message with headers and payload, and do the // sending of the message. - p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m) + p.messageDeliverNats(b, natsMsgHeader, natsConn, m) + // Get the process name so we can look up the process in the + // processes map, and increment the message counter. + pn := processNameGet(p.subject.name(), processKindPublisher) // Increment the counter for the next message to be sent. p.messageID++ diff --git a/server.go b/server.go index e32fb10..bc4d744 100644 --- a/server.go +++ b/server.go @@ -17,7 +17,6 @@ import ( "github.com/jinzhu/copier" "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" - "github.com/nats-io/nats.go/jetstream" "github.com/prometheus/client_golang/prometheus" ) @@ -669,17 +668,14 @@ func (s *server) messageSerializeAndCompress(msg Message) ([]byte, error) { } // messageDeserializeAndUncompress will deserialize the ctrl message -func (s *server) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, error) { +func (s *server) messageDeserializeAndUncompress(msgData []byte) (Message, error) { - // Variable to hold a copy of the message data. - msgData := msg.Data() - - // If debugging is enabled, print the source node name of the nats messages received. - headerFromNode := msg.Headers().Get("fromNode") - if headerFromNode != "" { - er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject()) - s.errorKernel.logDebug(er) - } + // // If debugging is enabled, print the source node name of the nats messages received. + // headerFromNode := msg.Headers().Get("fromNode") + // if headerFromNode != "" { + // er := fmt.Errorf("info: subscriberHandlerJetstream: nats message received from %v, with subject %v ", headerFromNode, msg.Subject()) + // s.errorKernel.logDebug(er) + // } zr, err := zstd.NewReader(nil) if err != nil { @@ -699,7 +695,7 @@ func (s *server) messageDeserializeAndUncompress(msg jetstream.Msg) (Message, er err = cbor.Unmarshal(msgData, &message) if err != nil { - er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, subject: %v, error: %v", msg.Subject(), err) + er := fmt.Errorf("error: subscriberHandlerJetstream: cbor decoding failed, error: %v", err) return Message{}, er }