diff --git a/message_readers.go b/message_readers.go index 241ac4a..8d045e6 100644 --- a/message_readers.go +++ b/message_readers.go @@ -182,7 +182,7 @@ func (s *server) jetstreamConsume() { } } - er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %q", filterSubjectValues) + er := fmt.Errorf("jetstreamConsume: will consume the following subjects: %v", filterSubjectValues) s.errorKernel.errSend(s.processInitial, Message{}, er, logInfo) cons, err := stream.CreateOrUpdateConsumer(s.ctx, jetstream.ConsumerConfig{ diff --git a/process.go b/process.go index 681ddf0..9617582 100644 --- a/process.go +++ b/process.go @@ -6,10 +6,8 @@ import ( "errors" "fmt" "log" - "os" "time" - "github.com/klauspost/compress/zstd" "github.com/nats-io/nats.go" "github.com/prometheus/client_golang/prometheus" // "google.golang.org/protobuf/internal/errors" @@ -179,13 +177,13 @@ func (p process) start() { } // Start a publisher worker, which will start a go routine (process) - // That will take care of all the messages for the subject it owns. + // to handle publishing of the messages for the subject it owns. if p.processKind == processKindPublisher { p.startPublisher() } // Start a subscriber worker, which will start a go routine (process) - // That will take care of all the messages for the subject it owns. + // to handle executing the request method defined in the message. if p.processKind == processKindSubscriber { p.startSubscriber() } @@ -199,6 +197,7 @@ func (p process) start() { p.errorKernel.logDebug(er) } +// startPublisher. func (p process) startPublisher() { // If there is a procFunc for the process, start it. if p.procFunc != nil { @@ -716,24 +715,6 @@ func (p process) subscribeMessages() *nats.Subscription { // as long as the process it belongs to is running. func (p process) publishMessages(natsConn *nats.Conn) { - var zEnc *zstd.Encoder - // Prepare a zstd encoder if enabled. By enabling it here before - // looping over the messages to send below, we can reuse the zstd - // encoder for all messages. - switch p.configuration.Compression { - case "z": // zstd - // enc, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedBestCompression)) - enc, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1)) - if err != nil { - er := fmt.Errorf("error: zstd new encoder failed: %v", err) - p.errorKernel.logError(er) - os.Exit(1) - } - zEnc = enc - defer zEnc.Close() - - } - // Adding a timer that will be used for when to remove the sub process // publisher. The timer is reset each time a message is published with // the process, so the sub process publisher will not be removed until