1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

implemented message timeout and retry options

This commit is contained in:
postmannen 2021-02-25 13:08:10 +01:00
parent 9daa8ebcd6
commit f49b4b3cf6
6 changed files with 46 additions and 15 deletions

View file

@ -16,20 +16,20 @@ func main() {
brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker")
profilingPort := flag.String("profilingPort", "", "The number of the profiling port")
promHostAndPort := flag.String("promHostAndPort", ":2112", "host and port for prometheus listener, e.g. localhost:2112")
centralErrorLogger := flag.Bool("centralErrorLogger", false, "seet to true if this is the node that should receive the error log's from other nodes")
//isCentral := flag.Bool("isCentral", false, "used to indicate that this is the central master that will subscribe to error message subjects")
centralErrorLogger := flag.Bool("centralErrorLogger", false, "set to true if this is the node that should receive the error log's from other nodes")
defaultMessageTimeout := flag.Int("defaultMessageTimeout", 10, "default message timeout in seconds. This can be overridden on the message level")
defaultMessageRetries := flag.Int("defaultMessageRetries", 0, "default amount of retries that will be done before a message is thrown away, and out of the system")
flag.Parse()
// Start profiling if profiling port is specified
if *profilingPort != "" {
// TODO REMOVE: Added for profiling
go func() {
http.ListenAndServe("localhost:"+*profilingPort, nil)
}()
}
s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger)
s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger, *defaultMessageTimeout, *defaultMessageRetries)
if err != nil {
log.Printf("error: failed to connect to broker: %v\n", err)
os.Exit(1)

View file

@ -4,7 +4,8 @@
"toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"],
"commandOrEvent":"CommandACK",
"method":"ShellCommand"
"method":"ShellCommand",
"timeout":3,
"retries":3
}
]

Binary file not shown.

View file

@ -22,7 +22,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM
inCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValue)
// start the ringbuffer.
rb.start(inCh, ringBufferOutCh)
rb.start(inCh, ringBufferOutCh, s.defaultMessageTimeout, s.defaultMessageRetries)
// Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer.

View file

@ -55,7 +55,7 @@ func newringBuffer(size int, dbFileName string) *ringBuffer {
// start will process incomming messages through the inCh,
// put the messages on a buffered channel
// and deliver messages out when requested on the outCh.
func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue, defaultMessageTimeout int, defaultMessageRetries int) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
@ -68,7 +68,7 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
r.totalMessagesIndex = r.getIndexValue(indexValueBucket)
// Fill the buffer when new data arrives into the system
go r.fillBuffer(inCh, samValueBucket, indexValueBucket)
go r.fillBuffer(inCh, samValueBucket, indexValueBucket, defaultMessageTimeout, defaultMessageRetries)
// Start the process to permanently store done messages.
go r.startPermanentStore()
@ -79,7 +79,7 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string) {
func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string, defaultMessageTimeout int, defaultMessageRetries int) {
// At startup get all the values that might be in the K/V store so we can
// put them into the buffer before we start to fill up with new incomming
// messages to the system.
@ -119,6 +119,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
continue
}
// Check if message values for timers override default values
if v.Message.Timeout < 1 {
v.Message.Timeout = defaultMessageTimeout
}
if v.Message.Retries < 1 {
v.Message.Retries = defaultMessageRetries
}
// --- Store the incomming message in the k/v store ---
// Get a unique number for the message to use when storing

View file

@ -49,10 +49,14 @@ type server struct {
subscriberServices *subscriberServices
// Is this the central error logger ?
centralErrorLogger bool
// default message timeout in seconds. This can be overridden on the message level
defaultMessageTimeout int
// default amount of retries that will be done before a message is thrown away, and out of the system
defaultMessageRetries int
}
// newServer will prepare and return a server type
func NewServer(brokerAddress string, nodeName string, promHostAndPort string, centralErrorLogger bool) (*server, error) {
func NewServer(brokerAddress string, nodeName string, promHostAndPort string, centralErrorLogger bool, defaultMessageTimeout int, defaultMessageRetries int) (*server, error) {
conn, err := nats.Connect(brokerAddress, nil)
if err != nil {
log.Printf("error: nats.Connect failed: %v\n", err)
@ -71,6 +75,8 @@ func NewServer(brokerAddress string, nodeName string, promHostAndPort string, ce
metrics: newMetrics(promHostAndPort),
subscriberServices: newSubscriberServices(),
centralErrorLogger: centralErrorLogger,
defaultMessageTimeout: defaultMessageTimeout,
defaultMessageRetries: defaultMessageRetries,
}
return s, nil
@ -207,6 +213,8 @@ func (s *server) spawnWorkerProcess(proc process) {
}
func (s *server) messageDeliverNats(proc process, message Message) {
retryAttempts := 0
for {
dataPayload, err := gobEncodeMessage(message)
if err != nil {
@ -246,11 +254,25 @@ func (s *server) messageDeliverNats(proc process, message Message) {
if message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK {
// Wait up until 10 seconds for a reply,
// continue and resend if to reply received.
msgReply, err := subReply.NextMsg(time.Second * 10)
msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout))
if err != nil {
log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err)
// did not receive a reply, continuing from top again
continue
// did not receive a reply, decide what to do..
retryAttempts++
fmt.Printf("Retry attempts:%v, retries: %v, timeout: %v\n", retryAttempts, message.Retries, message.Timeout)
switch {
case message.Retries == 0:
// 0 indicates unlimited retries
continue
case retryAttempts >= message.Retries:
// max retries reached
log.Printf("info: max retries for message reached, breaking out: %v", retryAttempts)
return
default:
// none of the above matched, so we've not reached max retries yet
continue
}
}
log.Printf("info: publisher: received ACK for message: %s\n", msgReply.Data)
}