mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
only using zstd compression, and cbor serialization
This commit is contained in:
parent
6c615591a6
commit
233a727ac4
4 changed files with 58 additions and 225 deletions
|
@ -76,10 +76,6 @@ type Configuration struct {
|
|||
ErrorMessageTimeout int `comment:"Timeout in seconds for error messages"`
|
||||
// Retries for error messages
|
||||
ErrorMessageRetries int `comment:"Retries for error messages"`
|
||||
// Compression z for zstd or g for gzip
|
||||
Compression string `comment:"Compression z for zstd or g for gzip"`
|
||||
// Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor
|
||||
Serialization string `comment:"Serialization, supports cbor or gob,default is gob. Enable cbor by setting the string value cbor"`
|
||||
// SetBlockProfileRate for block profiling
|
||||
SetBlockProfileRate int `comment:"SetBlockProfileRate for block profiling"`
|
||||
// EnableSocket for enabling the creation of a ctrl.sock file
|
||||
|
@ -177,8 +173,6 @@ func NewConfiguration() *Configuration {
|
|||
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", CheckEnv("EXPOSE_DATA_FOLDER", c.ExposeDataFolder).(string), "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
|
||||
flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", CheckEnv("ERROR_MESSAGE_TIMEOUT", c.ErrorMessageTimeout).(int), "The number of seconds to wait for an error message to time out")
|
||||
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", CheckEnv("ERROR_MESSAGE_RETRIES", c.ErrorMessageRetries).(int), "The number of if times to retry an error message before we drop it")
|
||||
flag.StringVar(&c.Compression, "compression", CheckEnv("COMPRESSION", c.Compression).(string), "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression")
|
||||
flag.StringVar(&c.Serialization, "serialization", CheckEnv("SERIALIZATION", c.Serialization).(string), "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob")
|
||||
flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", CheckEnv("BLOCK_PROFILE_RATE", c.SetBlockProfileRate).(int), "Enable block profiling by setting the value to f.ex. 1. 0 = disabled")
|
||||
flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file")
|
||||
flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.")
|
||||
|
@ -254,8 +248,6 @@ func newConfigurationDefaults() Configuration {
|
|||
ExposeDataFolder: "",
|
||||
ErrorMessageTimeout: 60,
|
||||
ErrorMessageRetries: 10,
|
||||
Compression: "z",
|
||||
Serialization: "cbor",
|
||||
SetBlockProfileRate: 0,
|
||||
EnableSocket: true,
|
||||
EnableSignatureCheck: false,
|
||||
|
|
12
go.mod
12
go.mod
|
@ -10,14 +10,14 @@ require (
|
|||
github.com/hpcloud/tail v1.0.0
|
||||
github.com/jinzhu/copier v0.4.0
|
||||
github.com/joho/godotenv v1.5.1
|
||||
github.com/klauspost/compress v1.17.0
|
||||
github.com/klauspost/compress v1.17.11
|
||||
github.com/nats-io/nats-server/v2 v2.8.4
|
||||
github.com/nats-io/nats.go v1.25.0
|
||||
github.com/nats-io/nkeys v0.4.4
|
||||
github.com/nats-io/nats.go v1.37.0
|
||||
github.com/nats-io/nkeys v0.4.7
|
||||
github.com/pkg/profile v1.7.0
|
||||
github.com/prometheus/client_golang v1.14.0
|
||||
go.etcd.io/bbolt v1.3.7
|
||||
golang.org/x/crypto v0.7.0
|
||||
golang.org/x/crypto v0.29.0
|
||||
golang.org/x/exp v0.0.0-20230321023759-10a507213a29
|
||||
gopkg.in/yaml.v3 v3.0.1
|
||||
)
|
||||
|
@ -39,8 +39,8 @@ require (
|
|||
github.com/prometheus/common v0.42.0 // indirect
|
||||
github.com/prometheus/procfs v0.9.0 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
golang.org/x/sys v0.6.0 // indirect
|
||||
golang.org/x/text v0.8.0 // indirect
|
||||
golang.org/x/sys v0.27.0 // indirect
|
||||
golang.org/x/text v0.20.0 // indirect
|
||||
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
|
||||
google.golang.org/protobuf v1.30.0 // indirect
|
||||
gopkg.in/fsnotify.v1 v1.4.7 // indirect
|
||||
|
|
12
go.sum
12
go.sum
|
@ -45,6 +45,8 @@ github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
|
|||
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
|
||||
github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM=
|
||||
github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
|
||||
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
|
||||
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
|
||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
|
||||
github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0=
|
||||
|
@ -65,9 +67,13 @@ github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBri
|
|||
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
|
||||
github.com/nats-io/nats.go v1.25.0 h1:t5/wCPGciR7X3Mu8QOi4jiJaXaWM8qtkLu4lzGZvYHE=
|
||||
github.com/nats-io/nats.go v1.25.0/go.mod h1:D2WALIhz7V8M0pH8Scx8JZXlg6Oqz5VG+nQkK8nJdvg=
|
||||
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
|
||||
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
|
||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
|
||||
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
|
||||
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
|
||||
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
|
||||
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
|
||||
|
@ -102,6 +108,8 @@ golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm
|
|||
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
|
||||
golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A=
|
||||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
|
||||
golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ=
|
||||
golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg=
|
||||
golang.org/x/exp v0.0.0-20230321023759-10a507213a29 h1:ooxPy7fPvB4kwsA2h+iBNHkAbp/4JxTSwCmvdjEYmug=
|
||||
golang.org/x/exp v0.0.0-20230321023759-10a507213a29/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
|
@ -116,6 +124,8 @@ golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ=
|
||||
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.6.0 h1:clScbb1cHjoCkyRbWwBEUZ5H/tIFu5TAXIqaZD0Gcjw=
|
||||
golang.org/x/term v0.6.0/go.mod h1:m6U89DPEgQRMq3DNkDClhWw02AUbt2daBVO4cn4Hv9U=
|
||||
|
@ -124,6 +134,8 @@ golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
|||
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
|
||||
golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68=
|
||||
golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
|
||||
golang.org/x/text v0.20.0 h1:gK/Kv2otX8gz+wn7Rmb3vT96ZwuoxnQlY+HlJVj7Qug=
|
||||
golang.org/x/text v0.20.0/go.mod h1:D4IsuqiFMhST5bX19pQ9ikHC2GsaKyk/oF+pn3ducp4=
|
||||
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 h1:Dpdu/EMxGMFgq0CeYMh4fazTD2vtlZRYE7wyynxJb9U=
|
||||
golang.org/x/time v0.0.0-20220609170525-579cf78fd858/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
|
||||
|
|
251
process.go
251
process.go
|
@ -1,16 +1,11 @@
|
|||
package ctrl
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"crypto/ed25519"
|
||||
"encoding/gob"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
|
@ -447,102 +442,42 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
|||
// the state of the message being processed, and then reply back to the
|
||||
// correct sending process's reply, meaning so we ACK back to the correct
|
||||
// publisher.
|
||||
func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, msg *nats.Msg, subject string) {
|
||||
func (p process) messageSubscriberHandlerNats(natsConn *nats.Conn, thisNode string, msg *nats.Msg, subject string) {
|
||||
|
||||
// Variable to hold a copy of the message data, so we don't mess with
|
||||
// the original data since the original is a pointer value.
|
||||
msgData := make([]byte, len(msg.Data))
|
||||
copy(msgData, msg.Data)
|
||||
|
||||
// fmt.Printf(" * DEBUG: header value on subscriberHandler: %v\n", msg.Header)
|
||||
|
||||
// If debugging is enabled, print the source node name of the nats messages received.
|
||||
if val, ok := msg.Header["fromNode"]; ok {
|
||||
er := fmt.Errorf("info: nats message received from %v, with subject %v ", val, subject)
|
||||
p.errorKernel.logDebug(er)
|
||||
}
|
||||
|
||||
// If compression is used, decompress it to get the gob data. If
|
||||
// compression is not used it is the gob encoded data we already
|
||||
// got in msgData so we do nothing with it.
|
||||
if val, ok := msg.Header["cmp"]; ok {
|
||||
switch val[0] {
|
||||
case "z":
|
||||
zr, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
msgData, err = zr.DecodeAll(msg.Data, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd decoding failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
zr.Close()
|
||||
return
|
||||
}
|
||||
|
||||
zr.Close()
|
||||
|
||||
case "g":
|
||||
r := bytes.NewReader(msgData)
|
||||
gr, err := gzip.NewReader(r)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gzip NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(gr)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gzip ReadAll failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
|
||||
gr.Close()
|
||||
|
||||
msgData = b
|
||||
}
|
||||
zr, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd NewReader failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
return
|
||||
}
|
||||
msgData, err = zr.DecodeAll(msg.Data, nil)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd decoding failed: %v", err)
|
||||
p.errorKernel.errSend(p, Message{}, er, logWarning)
|
||||
zr.Close()
|
||||
return
|
||||
}
|
||||
|
||||
zr.Close()
|
||||
|
||||
message := Message{}
|
||||
|
||||
// Check if serialization is specified.
|
||||
// Will default to gob serialization if nothing or non existing value is specified.
|
||||
if val, ok := msg.Header["serial"]; ok {
|
||||
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
|
||||
switch val[0] {
|
||||
case "cbor":
|
||||
err := cbor.Unmarshal(msgData, &message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
default: // Deaults to gob if no match was found.
|
||||
r := bytes.NewReader(msgData)
|
||||
gobDec := gob.NewDecoder(r)
|
||||
|
||||
err := gobDec.Decode(&message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
// Default to gob if serialization flag was not specified.
|
||||
r := bytes.NewReader(msgData)
|
||||
gobDec := gob.NewDecoder(r)
|
||||
|
||||
err := gobDec.Decode(&message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: gob decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
err = cbor.Unmarshal(msgData, &message)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: cbor decoding failed, subject: %v, header: %v, error: %v", subject, msg.Header, err)
|
||||
p.errorKernel.errSend(p, message, er, logError)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if it is an ACK or NACK message, and do the appropriate action accordingly.
|
||||
|
@ -782,7 +717,7 @@ func (p process) subscribeMessages() *nats.Subscription {
|
|||
//_, err := p.natsConn.Subscribe(subject, func(msg *nats.Msg) {
|
||||
|
||||
// Start up the subscriber handler.
|
||||
go p.messageSubscriberHandler(p.natsConn, p.configuration.NodeName, msg, subject)
|
||||
go p.messageSubscriberHandlerNats(p.natsConn, p.configuration.NodeName, msg, subject)
|
||||
})
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: Subscribe failed: %v", err)
|
||||
|
@ -797,25 +732,18 @@ func (p process) subscribeMessages() *nats.Subscription {
|
|||
// process. The function should be run as a goroutine, and will run
|
||||
// as long as the process it belongs to is running.
|
||||
func (p process) publishMessages(natsConn *nats.Conn) {
|
||||
var once sync.Once
|
||||
|
||||
var zEnc *zstd.Encoder
|
||||
// Prepare a zstd encoder if enabled. By enabling it here before
|
||||
// looping over the messages to send below, we can reuse the zstd
|
||||
// encoder for all messages.
|
||||
switch p.configuration.Compression {
|
||||
case "z": // zstd
|
||||
// enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
||||
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd new encoder failed: %v", err)
|
||||
p.errorKernel.logError(er)
|
||||
os.Exit(1)
|
||||
}
|
||||
zEnc = enc
|
||||
defer zEnc.Close()
|
||||
// Prepare a zstd encoder so we can reuse the zstd encoder for all messages.
|
||||
|
||||
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: zstd new encoder failed: %v", err)
|
||||
p.errorKernel.logError(er)
|
||||
os.Exit(1)
|
||||
}
|
||||
zEnc = enc
|
||||
defer zEnc.Close()
|
||||
|
||||
// Adding a timer that will be used for when to remove the sub process
|
||||
// publisher. The timer is reset each time a message is published with
|
||||
|
@ -859,7 +787,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
|||
m.ArgSignature = p.addMethodArgSignature(m)
|
||||
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
|
||||
|
||||
go p.publishAMessage(m, zEnc, &once, natsConn)
|
||||
go p.publishAMessage(m, zEnc, natsConn)
|
||||
case <-p.ctx.Done():
|
||||
er := fmt.Errorf("info: canceling publisher: %v", p.processName)
|
||||
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
|
||||
|
@ -876,7 +804,7 @@ func (p process) addMethodArgSignature(m Message) []byte {
|
|||
return sign
|
||||
}
|
||||
|
||||
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once, natsConn *nats.Conn) {
|
||||
func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, natsConn *nats.Conn) {
|
||||
// Create the initial header, and set values below depending on the
|
||||
// various configuration options chosen.
|
||||
natsMsgHeader := make(nats.Header)
|
||||
|
@ -885,94 +813,25 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once,
|
|||
// The serialized value of the nats message payload
|
||||
var natsMsgPayloadSerialized []byte
|
||||
|
||||
// encode the message structure into gob binary format before putting
|
||||
// it into a nats message.
|
||||
// Prepare a gob encoder with a buffer before we start the loop
|
||||
switch p.configuration.Serialization {
|
||||
case "cbor":
|
||||
b, err := cbor.Marshal(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
natsMsgPayloadSerialized = b
|
||||
natsMsgHeader["serial"] = []string{p.configuration.Serialization}
|
||||
|
||||
default:
|
||||
var bufGob bytes.Buffer
|
||||
gobEnc := gob.NewEncoder(&bufGob)
|
||||
err := gobEnc.Encode(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: gob encode message failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
natsMsgPayloadSerialized = bufGob.Bytes()
|
||||
natsMsgHeader["serial"] = []string{"gob"}
|
||||
// encode the message structure into cbor
|
||||
b, err := cbor.Marshal(m)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: messageDeliverNats: cbor encode message failed: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
natsMsgPayloadSerialized = b
|
||||
|
||||
// Get the process name so we can look up the process in the
|
||||
// processes map, and increment the message counter.
|
||||
pn := processNameGet(p.subject.name(), processKindPublisher)
|
||||
|
||||
// The compressed value of the nats message payload. The content
|
||||
// can either be compressed or in it's original form depening on
|
||||
// the outcome of the switch below, and if compression were chosen
|
||||
// or not.
|
||||
var natsMsgPayloadCompressed []byte
|
||||
|
||||
// Compress the data payload if selected with configuration flag.
|
||||
// The compression chosen is later set in the nats msg header when
|
||||
// calling p.messageDeliverNats below.
|
||||
switch p.configuration.Compression {
|
||||
case "z": // zstd
|
||||
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
|
||||
// p.zEncMutex.Lock()
|
||||
// zEnc.Reset(nil)
|
||||
// p.zEncMutex.Unlock()
|
||||
|
||||
case "g": // gzip
|
||||
var buf bytes.Buffer
|
||||
func() {
|
||||
gzipW := gzip.NewWriter(&buf)
|
||||
defer gzipW.Close()
|
||||
defer gzipW.Flush()
|
||||
_, err := gzipW.Write(natsMsgPayloadSerialized)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to write gzip: %v", err)
|
||||
p.errorKernel.logDebug(er)
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
natsMsgPayloadCompressed = buf.Bytes()
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
|
||||
case "": // no compression
|
||||
natsMsgPayloadCompressed = natsMsgPayloadSerialized
|
||||
natsMsgHeader["cmp"] = []string{"none"}
|
||||
|
||||
default: // no compression
|
||||
// Allways log the error to console.
|
||||
er := fmt.Errorf("error: publishing: compression type not defined, setting default to no compression")
|
||||
p.errorKernel.logDebug(er)
|
||||
|
||||
// We only wan't to send the error message to errorCentral once.
|
||||
once.Do(func() {
|
||||
p.errorKernel.logDebug(er)
|
||||
})
|
||||
|
||||
// No compression, so we just assign the value of the serialized
|
||||
// data directly to the variable used with messageDeliverNats.
|
||||
natsMsgPayloadCompressed = natsMsgPayloadSerialized
|
||||
natsMsgHeader["cmp"] = []string{"none"}
|
||||
}
|
||||
natsMsgPayloadCompressed := zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||
|
||||
// Create the Nats message with headers and payload, and do the
|
||||
// sending of the message.
|
||||
|
@ -986,34 +845,4 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once *sync.Once,
|
|||
p.processes.active.procNames[pn] = p
|
||||
p.processes.active.mu.Unlock()
|
||||
}
|
||||
|
||||
// // Handle the error.
|
||||
// //
|
||||
// // NOTE: None of the processes above generate an error, so the the
|
||||
// // if clause will never be triggered. But keeping it here as an example
|
||||
// // for now for how to handle errors.
|
||||
// if err != nil {
|
||||
// // Create an error type which also creates a channel which the
|
||||
// // errorKernel will send back the action about what to do.
|
||||
// ep := errorEvent{
|
||||
// //errorType: logOnly,
|
||||
// process: p,
|
||||
// message: m,
|
||||
// errorActionCh: make(chan errorAction),
|
||||
// }
|
||||
// p.errorCh <- ep
|
||||
//
|
||||
// // Wait for the response action back from the error kernel, and
|
||||
// // decide what to do. Should we continue, quit, or .... ?
|
||||
// switch <-ep.errorActionCh {
|
||||
// case errActionContinue:
|
||||
// // Just log and continue
|
||||
// log.Printf("The errAction was continue...so we're continuing\n")
|
||||
// case errActionKill:
|
||||
// log.Printf("The errAction was kill...so we're killing\n")
|
||||
// // ....
|
||||
// default:
|
||||
// log.Printf("Info: publishMessages: The errAction was not defined, so we're doing nothing\n")
|
||||
// }
|
||||
// }
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue