mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
zstdConcurrency=1, added publish messages ctx
This commit is contained in:
parent
596b7e818c
commit
a0d5be3580
1 changed files with 11 additions and 5 deletions
16
process.go
16
process.go
|
@ -588,7 +588,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
|
||||||
switch p.configuration.Compression {
|
switch p.configuration.Compression {
|
||||||
case "z": // zstd
|
case "z": // zstd
|
||||||
// enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
// enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
|
||||||
enc, err := zstd.NewWriter(nil)
|
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: zstd new encoder failed: %v\n", err)
|
log.Printf("error: zstd new encoder failed: %v\n", err)
|
||||||
os.Exit(1)
|
os.Exit(1)
|
||||||
|
@ -686,7 +686,9 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
||||||
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
natsMsgPayloadCompressed = zEnc.EncodeAll(natsMsgPayloadSerialized, nil)
|
||||||
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
natsMsgHeader["cmp"] = []string{p.configuration.Compression}
|
||||||
|
|
||||||
zEnc.Reset(nil)
|
// p.zEncMutex.Lock()
|
||||||
|
// zEnc.Reset(nil)
|
||||||
|
// p.zEncMutex.Unlock()
|
||||||
|
|
||||||
case "g": // gzip
|
case "g": // gzip
|
||||||
var buf bytes.Buffer
|
var buf bytes.Buffer
|
||||||
|
@ -726,9 +728,13 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
|
||||||
// sending of the message.
|
// sending of the message.
|
||||||
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
|
p.messageDeliverNats(natsMsgPayloadCompressed, natsMsgHeader, natsConn, m)
|
||||||
|
|
||||||
// Signaling back to the ringbuffer that we are done with the
|
select {
|
||||||
// current message, and it can remove it from the ringbuffer.
|
case m.done <- struct{}{}:
|
||||||
m.done <- struct{}{}
|
// Signaling back to the ringbuffer that we are done with the
|
||||||
|
// current message, and it can remove it from the ringbuffer.
|
||||||
|
case <-p.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// Increment the counter for the next message to be sent.
|
// Increment the counter for the next message to be sent.
|
||||||
p.messageID++
|
p.messageID++
|
||||||
|
|
Loading…
Reference in a new issue