1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

fixed memory leak not unsubscribing reply messages

This commit is contained in:
postmannen 2021-09-16 08:13:24 +02:00
parent a20bf4021a
commit d8169841e0

View file

@ -231,6 +231,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
time.Sleep(time.Second * subscribeSyncTimer)
subReply.Unsubscribe()
continue
}
@ -259,10 +260,10 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// did not receive a reply, decide what to do..
retryAttempts++
log.Printf("Retry attempts:%v, retries: %v, ACKTimeout: %v\n", retryAttempts, message.Retries, message.ACKTimeout)
log.Printf("Retry attempt:%v, retries: %v, ACKTimeout: %v, message.ID: %v\n", retryAttempts, message.Retries, message.ACKTimeout, message.ID)
switch {
case message.Retries == 0:
case message.Retries == -1:
// 0 indicates unlimited retries
continue
case retryAttempts >= message.Retries:
@ -277,6 +278,8 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
log.Printf("%v\n", er)
subReply.Unsubscribe()
p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc()
return
@ -290,6 +293,8 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data)
}
subReply.Unsubscribe()
p.processes.metrics.promNatsDeliveredTotal.Inc()
return