mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
defered closing of subscrybe sync
This commit is contained in:
parent
c5d6ea0cb7
commit
74f38e9ba2
1 changed files with 64 additions and 37 deletions
101
process.go
101
process.go
|
@ -6,6 +6,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ed25519"
|
"crypto/ed25519"
|
||||||
"encoding/gob"
|
"encoding/gob"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -17,6 +18,7 @@ import (
|
||||||
"github.com/klauspost/compress/zstd"
|
"github.com/klauspost/compress/zstd"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
|
// "google.golang.org/protobuf/internal/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
// processKind are either kindSubscriber or kindPublisher, and are
|
// processKind are either kindSubscriber or kindPublisher, and are
|
||||||
|
@ -230,6 +232,10 @@ func (p process) spawnWorker() {
|
||||||
log.Printf("Successfully started process: %v\n", p.processName)
|
log.Printf("Successfully started process: %v\n", p.processName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrACKSubscribeRetry = errors.New("steward: retrying to subscribe for ack message")
|
||||||
|
)
|
||||||
|
|
||||||
// messageDeliverNats will create the Nats message with headers and payload.
|
// messageDeliverNats will create the Nats message with headers and payload.
|
||||||
// It will also take care of the delivering the message that is converted to
|
// It will also take care of the delivering the message that is converted to
|
||||||
// gob or cbor format as a nats.Message. It will also take care of checking
|
// gob or cbor format as a nats.Message. It will also take care of checking
|
||||||
|
@ -263,44 +269,55 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.metrics.promNatsDeliveredTotal.Inc()
|
p.metrics.promNatsDeliveredTotal.Inc()
|
||||||
|
|
||||||
|
// The reaming logic is for handling ACK messages, so we return here
|
||||||
|
// since it was a NACK message, and all or now done.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// The SubscribeSync used in the subscriber, will get messages that
|
err := func() error {
|
||||||
// are sent after it started subscribing.
|
// The SubscribeSync used in the subscriber, will get messages that
|
||||||
//
|
// are sent after it started subscribing.
|
||||||
// Create a subscriber for the ACK reply message.
|
//
|
||||||
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
// Create a subscriber for the ACK reply message.
|
||||||
if err != nil {
|
subReply, err := natsConn.SubscribeSync(msg.Reply)
|
||||||
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
if err != nil {
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
er := fmt.Errorf("error: nats SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
|
||||||
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
time.Sleep(time.Second * subscribeSyncTimer)
|
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
|
||||||
subReply.Unsubscribe()
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Publish message
|
time.Sleep(time.Second * subscribeSyncTimer)
|
||||||
err = natsConn.PublishMsg(msg)
|
subReply.Unsubscribe()
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: nats publish failed: %v", err)
|
retryAttempts++
|
||||||
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
return ErrACKSubscribeRetry
|
||||||
log.Printf("%v, waiting %ds before retrying\n", er, publishTimer)
|
}
|
||||||
time.Sleep(time.Second * publishTimer)
|
|
||||||
subReply.Unsubscribe()
|
defer func() {
|
||||||
continue
|
err := subReply.Unsubscribe()
|
||||||
}
|
if err != nil {
|
||||||
|
log.Printf("error: nats SubscribeSync: failed when unsubscribing for ACK: %v\n", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Publish message
|
||||||
|
err = natsConn.PublishMsg(msg)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: nats publish failed: %v", err)
|
||||||
|
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
|
||||||
|
log.Printf("%v, waiting %ds before retrying\n", er, publishTimer)
|
||||||
|
time.Sleep(time.Second * publishTimer)
|
||||||
|
|
||||||
|
return ErrACKSubscribeRetry
|
||||||
|
}
|
||||||
|
|
||||||
// If the message is an ACK type of message we must check that a
|
|
||||||
// reply, and if it is not we don't wait here at all.
|
|
||||||
if p.subject.Event == EventACK {
|
|
||||||
// Wait up until ACKTimeout specified for a reply,
|
// Wait up until ACKTimeout specified for a reply,
|
||||||
// continue and resend if no reply received,
|
// continue and resend if no reply received,
|
||||||
// or exit if max retries for the message reached.
|
// or exit if max retries for the message reached.
|
||||||
//
|
//
|
||||||
// The nats.Msg returned is discarded with '_' since
|
// The nats.Msg returned is discarded with '_' since
|
||||||
// we don't use it.
|
// we don't use it.
|
||||||
_, err := subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
_, err = subReply.NextMsg(time.Second * time.Duration(message.ACKTimeout))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if message.RetryWait < 0 {
|
if message.RetryWait < 0 {
|
||||||
message.RetryWait = 0
|
message.RetryWait = 0
|
||||||
|
@ -312,17 +329,20 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
time.Sleep(time.Second * time.Duration(message.RetryWait))
|
||||||
|
|
||||||
|
// Continue with the rest of the code to check number of retries etc..
|
||||||
|
|
||||||
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
case err == nats.ErrBadSubscription || err == nats.ErrConnectionClosed:
|
||||||
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
|
er := fmt.Errorf("error: ack receive failed: conneciton closed or bad subscription, will not retry message: subject=%v: %v", p.subject.name(), err)
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
return
|
return er
|
||||||
|
|
||||||
default:
|
default:
|
||||||
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err)
|
er := fmt.Errorf("error: ack receive failed: the error was not defined, check if nats client have been updated with new error values, and update steward to handle the new error type: subject=%v: %v", p.subject.name(), err)
|
||||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||||
|
|
||||||
return
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
// did not receive a reply, decide if we should try to retry sending.
|
// did not receive a reply, decide if we should try to retry sending.
|
||||||
|
@ -344,10 +364,8 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
p.errorKernel.infoSend(p, message, er)
|
p.errorKernel.infoSend(p, message, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
subReply.Unsubscribe()
|
|
||||||
|
|
||||||
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
p.metrics.promNatsMessagesFailedACKsTotal.Inc()
|
||||||
return
|
return er
|
||||||
|
|
||||||
default:
|
default:
|
||||||
// none of the above matched, so we've not reached max retries yet
|
// none of the above matched, so we've not reached max retries yet
|
||||||
|
@ -356,15 +374,24 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He
|
||||||
|
|
||||||
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
p.metrics.promNatsMessagesMissedACKsTotal.Inc()
|
||||||
|
|
||||||
subReply.Unsubscribe()
|
return ErrACKSubscribeRetry
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// REMOVED: log.Printf("<--- publisher: received ACK from:%v, for: %v, data: %s\n", message.ToNode, message.Method, msgReply.Data)
|
|
||||||
|
return nil
|
||||||
|
}()
|
||||||
|
|
||||||
|
if err == ErrACKSubscribeRetry {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// All error printing are handled within the function that returns
|
||||||
|
// the error, so we do nothing and return.
|
||||||
|
// No more trying to deliver the message
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
subReply.Unsubscribe()
|
// Message were delivered successfully.
|
||||||
|
|
||||||
p.metrics.promNatsDeliveredTotal.Inc()
|
p.metrics.promNatsDeliveredTotal.Inc()
|
||||||
|
|
||||||
return
|
return
|
||||||
|
|
Loading…
Add table
Reference in a new issue