1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-05 14:56:49 +00:00

defering subReply.Unsubscribe

This commit is contained in:
postmannen 2022-06-18 08:12:14 +02:00
parent 988bd04d7f
commit 6c900296c9

View file

@ -283,17 +283,6 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
// //
// Create a subscriber for the ACK reply message. // Create a subscriber for the ACK reply message.
subReply, err := natsConn.SubscribeSync(msg.Reply) subReply, err := natsConn.SubscribeSync(msg.Reply)
if err != nil {
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
time.Sleep(time.Second * subscribeSyncTimer)
subReply.Unsubscribe()
retryAttempts++
return ErrACKSubscribeRetry
}
defer func() { defer func() {
err := subReply.Unsubscribe() err := subReply.Unsubscribe()
@ -302,6 +291,18 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
} }
}() }()
if err != nil {
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
time.Sleep(time.Second * subscribeSyncTimer)
// subReply.Unsubscribe()
retryAttempts++
return ErrACKSubscribeRetry
}
// Publish message // Publish message
err = natsConn.PublishMsg(msg) err = natsConn.PublishMsg(msg)
if err != nil { if err != nil {
@ -797,14 +798,17 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
case "g": // gzip case "g": // gzip
var buf bytes.Buffer var buf bytes.Buffer
gzipW := gzip.NewWriter(&buf) func() {
_, err := gzipW.Write(natsMsgPayloadSerialized) gzipW := gzip.NewWriter(&buf)
if err != nil { defer gzipW.Close()
log.Printf("error: failed to write gzip: %v\n", err) defer gzipW.Flush()
gzipW.Close() _, err := gzipW.Write(natsMsgPayloadSerialized)
return if err != nil {
} log.Printf("error: failed to write gzip: %v\n", err)
gzipW.Close() return
}
}()
natsMsgPayloadCompressed = buf.Bytes() natsMsgPayloadCompressed = buf.Bytes()
natsMsgHeader["cmp"] = []string{p.configuration.Compression} natsMsgHeader["cmp"] = []string{p.configuration.Compression}