From 6c900296c92a16a23638bb9bc62d48108cd25df8 Mon Sep 17 00:00:00 2001 From: postmannen Date: Sat, 18 Jun 2022 08:12:14 +0200 Subject: [PATCH] defering subReply.Unsubscribe --- process.go | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/process.go b/process.go index b8ecb26..3558e45 100644 --- a/process.go +++ b/process.go @@ -283,17 +283,6 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He // // Create a subscriber for the ACK reply message. subReply, err := natsConn.SubscribeSync(msg.Reply) - if err != nil { - er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) - // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) - log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer) - - time.Sleep(time.Second * subscribeSyncTimer) - subReply.Unsubscribe() - - retryAttempts++ - return ErrACKSubscribeRetry - } defer func() { err := subReply.Unsubscribe() @@ -302,6 +291,18 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He } }() + if err != nil { + er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) + // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) + log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer) + + time.Sleep(time.Second * subscribeSyncTimer) + // subReply.Unsubscribe() + + retryAttempts++ + return ErrACKSubscribeRetry + } + // Publish message err = natsConn.PublishMsg(msg) if err != nil { @@ -797,14 +798,17 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, case "g": // gzip var buf bytes.Buffer - gzipW := gzip.NewWriter(&buf) - _, err := gzipW.Write(natsMsgPayloadSerialized) - if err != nil { - log.Printf("error: failed to write gzip: %v\n", err) - gzipW.Close() - return - } - gzipW.Close() + func() { + gzipW := gzip.NewWriter(&buf) + defer gzipW.Close() + defer gzipW.Flush() + _, err := gzipW.Write(natsMsgPayloadSerialized) + if err != nil { + log.Printf("error: failed to write gzip: %v\n", err) + return + } + + }() natsMsgPayloadCompressed = buf.Bytes() natsMsgHeader["cmp"] = []string{p.configuration.Compression}