1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

removed old compression code from publisher

This commit is contained in:
postmannen 2024-11-27 13:16:59 +01:00
parent 97d5da948f
commit e898e3e81e
2 changed files with 4 additions and 23 deletions

View file

@ -182,7 +182,7 @@ func (s *server) jetstreamConsume() {
}
}
er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %q", filterSubjectValues)
er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %v", filterSubjectValues)
s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo)
cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{

View file

@ -6,10 +6,8 @@ import (
"errors"
"fmt"
"log"
"os"
"time"
"github.com/klauspost/compress/zstd"
"github.com/nats-io/nats.go"
"github.com/prometheus/client_golang/prometheus"
// "google.golang.org/protobuf/internal/errors"
@ -179,13 +177,13 @@ func (p process) start() {
}
// Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
// to handle publishing of the messages for the subject it owns.
if p.processKind == processKindPublisher {
p.startPublisher()
}
// Start a subscriber worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns.
// to handle executing the request method defined in the message.
if p.processKind == processKindSubscriber {
p.startSubscriber()
}
@ -199,6 +197,7 @@ func (p process) start() {
p.errorKernel.logDebug(er)
}
// startPublisher.
func (p process) startPublisher() {
// If there is a procFunc for the process, start it.
if p.procFunc != nil {
@ -716,24 +715,6 @@ func (p process) subscribeMessages() *nats.Subscription {
// as long as the process it belongs to is running.
func (p process) publishMessages(natsConn *nats.Conn) {
var zEnc *zstd.Encoder
// Prepare a zstd encoder if enabled. By enabling it here before
// looping over the messages to send below, we can reuse the zstd
// encoder for all messages.
switch p.configuration.Compression {
case "z": // zstd
// enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression))
enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
er := fmt.Errorf("error: zstd new encoder failed: %v", err)
p.errorKernel.logError(er)
os.Exit(1)
}
zEnc = enc
defer zEnc.Close()
}
// Adding a timer that will be used for when to remove the sub process
// publisher. The timer is reset each time a message is published with
// the process, so the sub process publisher will not be removed until