diff --git a/cmd/main.go b/cmd/main.go index 6dab5cb..a708996 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -16,20 +16,20 @@ func main() { brokerAddress := flag.String("brokerAddress", "0", "the address of the message broker") profilingPort := flag.String("profilingPort", "", "The number of the profiling port") promHostAndPort := flag.String("promHostAndPort", ":2112", "host and port for prometheus listener, e.g. localhost:2112") - centralErrorLogger := flag.Bool("centralErrorLogger", false, "seet to true if this is the node that should receive the error log's from other nodes") - //isCentral := flag.Bool("isCentral", false, "used to indicate that this is the central master that will subscribe to error message subjects") + centralErrorLogger := flag.Bool("centralErrorLogger", false, "set to true if this is the node that should receive the error log's from other nodes") + defaultMessageTimeout := flag.Int("defaultMessageTimeout", 10, "default message timeout in seconds. This can be overridden on the message level") + defaultMessageRetries := flag.Int("defaultMessageRetries", 0, "default amount of retries that will be done before a message is thrown away, and out of the system") flag.Parse() + // Start profiling if profiling port is specified if *profilingPort != "" { - // TODO REMOVE: Added for profiling - go func() { http.ListenAndServe("localhost:"+*profilingPort, nil) }() } - s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger) + s, err := steward.NewServer(*brokerAddress, *nodeName, *promHostAndPort, *centralErrorLogger, *defaultMessageTimeout, *defaultMessageRetries) if err != nil { log.Printf("error: failed to connect to broker: %v\n", err) os.Exit(1) diff --git a/example/toShip1.json b/example/toShip1.json index abce93c..26ac4f4 100644 --- a/example/toShip1.json +++ b/example/toShip1.json @@ -4,7 +4,8 @@ "toNode": "ship1", "data": ["bash","-c","netstat -an|grep -i listen"], "commandOrEvent":"CommandACK", - "method":"ShellCommand" - + "method":"ShellCommand", + "timeout":3, + "retries":3 } ] \ No newline at end of file diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 67d1fe6..d7dca75 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/publisher.go b/publisher.go index cd240f2..91858d5 100644 --- a/publisher.go +++ b/publisher.go @@ -22,7 +22,7 @@ func (s *server) processNewMessages(dbFileName string, newSAM chan []subjectAndM inCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValue) // start the ringbuffer. - rb.start(inCh, ringBufferOutCh) + rb.start(inCh, ringBufferOutCh, s.defaultMessageTimeout, s.defaultMessageRetries) // Start reading new fresh messages received on the incomming message // pipe/file requested, and fill them into the buffer. diff --git a/ringbuffer.go b/ringbuffer.go index 045a482..f27eb5b 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -55,7 +55,7 @@ func newringBuffer(size int, dbFileName string) *ringBuffer { // start will process incomming messages through the inCh, // put the messages on a buffered channel // and deliver messages out when requested on the outCh. -func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) { +func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue, defaultMessageTimeout int, defaultMessageRetries int) { // Starting both writing and reading in separate go routines so we // can write and read concurrently. @@ -68,7 +68,7 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) { r.totalMessagesIndex = r.getIndexValue(indexValueBucket) // Fill the buffer when new data arrives into the system - go r.fillBuffer(inCh, samValueBucket, indexValueBucket) + go r.fillBuffer(inCh, samValueBucket, indexValueBucket, defaultMessageTimeout, defaultMessageRetries) // Start the process to permanently store done messages. go r.startPermanentStore() @@ -79,7 +79,7 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) { // fillBuffer will fill the buffer in the ringbuffer reading from the inchannel. // It will also store the messages in a K/V DB while being processed. -func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string) { +func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string, defaultMessageTimeout int, defaultMessageRetries int) { // At startup get all the values that might be in the K/V store so we can // put them into the buffer before we start to fill up with new incomming // messages to the system. @@ -119,6 +119,14 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri continue } + // Check if message values for timers override default values + if v.Message.Timeout < 1 { + v.Message.Timeout = defaultMessageTimeout + } + if v.Message.Retries < 1 { + v.Message.Retries = defaultMessageRetries + } + // --- Store the incomming message in the k/v store --- // Get a unique number for the message to use when storing diff --git a/server.go b/server.go index 630f72c..650f93a 100644 --- a/server.go +++ b/server.go @@ -49,10 +49,14 @@ type server struct { subscriberServices *subscriberServices // Is this the central error logger ? centralErrorLogger bool + // default message timeout in seconds. This can be overridden on the message level + defaultMessageTimeout int + // default amount of retries that will be done before a message is thrown away, and out of the system + defaultMessageRetries int } // newServer will prepare and return a server type -func NewServer(brokerAddress string, nodeName string, promHostAndPort string, centralErrorLogger bool) (*server, error) { +func NewServer(brokerAddress string, nodeName string, promHostAndPort string, centralErrorLogger bool, defaultMessageTimeout int, defaultMessageRetries int) (*server, error) { conn, err := nats.Connect(brokerAddress, nil) if err != nil { log.Printf("error: nats.Connect failed: %v\n", err) @@ -71,6 +75,8 @@ func NewServer(brokerAddress string, nodeName string, promHostAndPort string, ce metrics: newMetrics(promHostAndPort), subscriberServices: newSubscriberServices(), centralErrorLogger: centralErrorLogger, + defaultMessageTimeout: defaultMessageTimeout, + defaultMessageRetries: defaultMessageRetries, } return s, nil @@ -207,6 +213,8 @@ func (s *server) spawnWorkerProcess(proc process) { } func (s *server) messageDeliverNats(proc process, message Message) { + retryAttempts := 0 + for { dataPayload, err := gobEncodeMessage(message) if err != nil { @@ -246,11 +254,25 @@ func (s *server) messageDeliverNats(proc process, message Message) { if message.CommandOrEvent == CommandACK || message.CommandOrEvent == EventACK { // Wait up until 10 seconds for a reply, // continue and resend if to reply received. - msgReply, err := subReply.NextMsg(time.Second * 10) + msgReply, err := subReply.NextMsg(time.Second * time.Duration(message.Timeout)) if err != nil { log.Printf("error: subReply.NextMsg failed for node=%v, subject=%v: %v\n", proc.node, proc.subject.name(), err) - // did not receive a reply, continuing from top again - continue + + // did not receive a reply, decide what to do.. + retryAttempts++ + fmt.Printf("Retry attempts:%v, retries: %v, timeout: %v\n", retryAttempts, message.Retries, message.Timeout) + switch { + case message.Retries == 0: + // 0 indicates unlimited retries + continue + case retryAttempts >= message.Retries: + // max retries reached + log.Printf("info: max retries for message reached, breaking out: %v", retryAttempts) + return + default: + // none of the above matched, so we've not reached max retries yet + continue + } } log.Printf("info: publisher: received ACK for message: %s\n", msgReply.Data) }