diff --git a/configuration_flags.go b/configuration_flags.go index 2573d2e..b45a61f 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -486,7 +486,7 @@ func (c *Configuration) CheckFlags() error { flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all") flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", fc.ErrorMessageTimeout, "The number of seconds to wait for an error message to time out") flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it") - flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd. Undefined value will default to no compression") + flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression") flag.StringVar(&c.Serialization, "serialization", fc.Serialization, "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob") flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", fc.SetBlockProfileRate, "Enable block profiling by setting the value to f.ex. 1. 0 = disabled") diff --git a/process.go b/process.go index 597c6d3..4a85bd8 100644 --- a/process.go +++ b/process.go @@ -2,9 +2,11 @@ package steward import ( "bytes" + "compress/gzip" "context" "encoding/gob" "fmt" + "io" "log" "os" "sync" @@ -360,6 +362,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val)) switch val[0] { case "z": + fmt.Println(" ******** READING ZSTD ************") zr, err := zstd.NewReader(nil) if err != nil { log.Printf("error: zstd NewReader failed: %v\n", err) @@ -370,9 +373,30 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, er := fmt.Errorf("error: zstd decoding failed: %v", err) log.Printf("%v\n", er) sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er) + zr.Close() return } + zr.Close() + + case "g": + fmt.Println(" ******** READING GZIP ************") + r := bytes.NewReader(msgData) + gr, err := gzip.NewReader(r) + if err != nil { + log.Printf("error: gzip NewReader failed: %v\n", err) + return + } + + b, err := io.ReadAll(gr) + if err != nil { + log.Printf("error: gzip ReadAll failed: %v\n", err) + return + } + + gr.Close() + + msgData = b } } @@ -634,6 +658,19 @@ func (p process) publishMessages(natsConn *nats.Conn) { natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil) natsMsgHeader["cmp"] = []string{p.configuration.Compression} + case "g": // gzip + var buf bytes.Buffer + gzipW := gzip.NewWriter(&buf) + _, err := gzipW.Write(natsMsgPayloadSerialized) + if err != nil { + log.Printf("error: failed to write gzip: %v\n", err) + gzipW.Close() + continue + } + gzipW.Close() + + natsMsgPayloadCompressed = buf.Bytes() + natsMsgHeader["cmp"] = []string{p.configuration.Compression} case "": // no compression natsMsgPayloadCompressed = natsMsgPayloadSerialized