1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

Get default message timers directly from flag

This commit is contained in:
postmannen 2021-11-18 06:50:25 +01:00
parent 2477e7939d
commit 81bd4d0589
2 changed files with 8 additions and 8 deletions

View file

@ -93,7 +93,7 @@ func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configu
// start will process incomming messages through the inCh,
// put the messages on a buffered channel
// and deliver messages out when requested on the outCh.
func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, outCh chan samDBValueAndDelivered, defaultMessageTimeout int, defaultMessageRetries int) {
func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, outCh chan samDBValueAndDelivered) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
@ -101,7 +101,7 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
r.totalMessagesIndex = r.getIndexValue()
// Fill the buffer when new data arrives into the system
go r.fillBuffer(ctx, inCh, defaultMessageTimeout, defaultMessageRetries)
go r.fillBuffer(ctx, inCh)
// Start the process to permanently store done messages.
go r.startPermanentStore(ctx)
@ -125,7 +125,7 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage, defaultMessageTimeout int, defaultMessageRetries int) {
func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage) {
// At startup get all the values that might be in the K/V store so we can
// put them into the buffer before we start to fill up with new incomming
// messages to the system.
@ -170,13 +170,13 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
continue
}
// Check if message values for timers override default values
// TODO: Replace with values from configuration here.
// Check if default message values for timers are set, and if
// not then set default message values.
if v.Message.ACKTimeout < 1 {
v.Message.ACKTimeout = defaultMessageTimeout
v.Message.ACKTimeout = r.configuration.DefaultMessageTimeout
}
if v.Message.Retries < 1 {
v.Message.Retries = defaultMessageRetries
v.Message.Retries = r.configuration.DefaultMessageRetries
}
if v.Message.MethodTimeout < 1 {
v.Message.MethodTimeout = r.configuration.DefaultMethodTimeout

View file

@ -368,7 +368,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
ringBufferInCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValueAndDelivered)
// start the ringbuffer.
s.ringBuffer.start(s.ctx, ringBufferInCh, ringBufferOutCh, s.configuration.DefaultMessageTimeout, s.configuration.DefaultMessageRetries)
s.ringBuffer.start(s.ctx, ringBufferInCh, ringBufferOutCh)
// Start reading new fresh messages received on the incomming message
// pipe/file requested, and fill them into the buffer.