mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
added gzip compression
This commit is contained in:
parent
f63ee4eb7c
commit
f6e667dd82
2 changed files with 38 additions and 1 deletions
|
@ -486,7 +486,7 @@ func (c *Configuration) CheckFlags() error {
|
|||
flag.StringVar(&c.ExposeDataFolder, "exposeDataFolder", fc.ExposeDataFolder, "If set the data folder will be exposed on the given host:port. Default value is not exposed at all")
|
||||
flag.IntVar(&c.ErrorMessageTimeout, "errorMessageTimeout", fc.ErrorMessageTimeout, "The number of seconds to wait for an error message to time out")
|
||||
flag.IntVar(&c.ErrorMessageRetries, "errorMessageRetries", fc.ErrorMessageRetries, "The number of if times to retry an error message before we drop it")
|
||||
flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd. Undefined value will default to no compression")
|
||||
flag.StringVar(&c.Compression, "compression", fc.Compression, "compression method to use. defaults to no compression, z = zstd, g = gzip. Undefined value will default to no compression")
|
||||
flag.StringVar(&c.Serialization, "serialization", fc.Serialization, "Serialization method to use. defaults to gob, other values are = cbor. Undefined value will default to gob")
|
||||
flag.IntVar(&c.SetBlockProfileRate, "setBlockProfileRate", fc.SetBlockProfileRate, "Enable block profiling by setting the value to f.ex. 1. 0 = disabled")
|
||||
|
||||
|
|
37
process.go
37
process.go
|
@ -2,9 +2,11 @@ package steward
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/gob"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
|
@ -360,6 +362,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
// fmt.Printf(" * DEBUG: ok = %v, map = %v, len of val = %v\n", ok, msg.Header, len(val))
|
||||
switch val[0] {
|
||||
case "z":
|
||||
fmt.Println(" ******** READING ZSTD ************")
|
||||
zr, err := zstd.NewReader(nil)
|
||||
if err != nil {
|
||||
log.Printf("error: zstd NewReader failed: %v\n", err)
|
||||
|
@ -370,9 +373,30 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
|||
er := fmt.Errorf("error: zstd decoding failed: %v", err)
|
||||
log.Printf("%v\n", er)
|
||||
sendErrorLogMessage(p.configuration, p.processes.metrics, p.toRingbufferCh, Node(thisNode), er)
|
||||
zr.Close()
|
||||
return
|
||||
}
|
||||
|
||||
zr.Close()
|
||||
|
||||
case "g":
|
||||
fmt.Println(" ******** READING GZIP ************")
|
||||
r := bytes.NewReader(msgData)
|
||||
gr, err := gzip.NewReader(r)
|
||||
if err != nil {
|
||||
log.Printf("error: gzip NewReader failed: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(gr)
|
||||
if err != nil {
|
||||
log.Printf("error: gzip ReadAll failed: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
gr.Close()
|
||||
|
||||
msgData = b
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -634,6 +658,19 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
|||
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
|
||||
case "g": // gzip
|
||||
var buf bytes.Buffer
|
||||
gzipW := gzip.NewWriter(&buf)
|
||||
_, err := gzipW.Write(natsMsgPayloadSerialized)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to write gzip: %v\n", err)
|
||||
gzipW.Close()
|
||||
continue
|
||||
}
|
||||
gzipW.Close()
|
||||
|
||||
natsMsgPayloadCompressed = buf.Bytes()
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
case "": // no compression
|
||||
natsMsgPayloadCompressed = natsMsgPayloadSerialized
|
||||
|
||||
|
|
Loading…
Reference in a new issue