mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
reusing zstd encoder
This commit is contained in:
parent
5698623724
commit
f63ee4eb7c
1 changed files with 18 additions and 7 deletions
25
process.go
25
process.go
|
@ -6,6 +6,7 @@ import (
|
|||
"encoding/gob"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -544,6 +545,22 @@ func (p process) subscribeMessages() *nats.Subscription {
|
|||
func (p process) publishMessages(natsConn *nats.Conn) {
|
||||
var once sync.Once
|
||||
|
||||
var zEnc *zstd.Encoder
|
||||
// Prepare a zstd encoder if enabled. By enabling it here before
|
||||
// looping over the messages to send below, we can reuse the zstd
|
||||
// encoder for all messages.
|
||||
switch p.configuration.Compression {
|
||||
case "z": // zstd
|
||||
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
||||
if err != nil {
|
||||
log.Printf("error: zstd new encoder failed: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
zEnc = enc
|
||||
defer zEnc.Close()
|
||||
|
||||
}
|
||||
|
||||
// Loop and handle 1 message at a time. If some part of the code
|
||||
// fails in the loop we should throw an error and use `continue`
|
||||
// to jump back here to the beginning of the loop and continue
|
||||
|
@ -614,13 +631,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
|||
// calling p.messageDeliverNats below.
|
||||
switch p.configuration.Compression {
|
||||
case "z": // zstd
|
||||
enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
||||
if err != nil {
|
||||
log.Printf("error: zstd write failed: %v\n", err)
|
||||
}
|
||||
natsMsgPayloadCompressed = enc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||
enc.Close()
|
||||
|
||||
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||
|
||||
case "": // no compression
|
||||
|
|
Loading…
Reference in a new issue