1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

renamed in/out channels in ringbuffer

This commit is contained in:
postmannen 2023-04-26 08:20:55 +02:00
parent 11b6cb6a0e
commit 602138ffa6

View file

@ -102,7 +102,7 @@ func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configu
// start will process incomming messages through the inCh,
// put the messages on a buffered channel
// and deliver messages out when requested on the outCh.
func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, outCh chan samDBValueAndDelivered) {
func (r *ringBuffer) start(ctx context.Context, ringBufferInCh chan subjectAndMessage, ringBufferOutCh chan samDBValueAndDelivered) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
@ -113,13 +113,13 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
}
// Fill the buffer when new data arrives into the system
go r.fillBuffer(ctx, inCh)
go r.fillBuffer(ctx, ringBufferInCh)
// Start the process to permanently store done messages.
go r.startPermanentStore(ctx)
// Start the process that will handle messages present in the ringbuffer.
go r.processBufferMessages(ctx, outCh)
go r.processBufferMessages(ctx, ringBufferOutCh)
go func() {
ticker := time.NewTicker(time.Second * 5)
@ -140,7 +140,7 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage) {
func (r *ringBuffer) fillBuffer(ctx context.Context, ringBufferInCh chan subjectAndMessage) {
// At startup get all the values that might be in the K/V store so we can
// put them into the buffer before we start to fill up with new incomming
// messages to the system.
@ -166,7 +166,7 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
// the go routine who reads the socket.
for {
select {
case sam := <-inCh:
case sam := <-ringBufferInCh:
// Check if default message values for timers are set, and if
// not then set default message values.
@ -237,7 +237,7 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
// one by one. The messages will be delivered on the outCh, and it will wait
// until a signal is received on the done channel before it continues with the
// next message.
func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDBValueAndDelivered) {
func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh chan samDBValueAndDelivered) {
// Range over the buffer of messages to pass on to processes.
for {
select {
@ -281,7 +281,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
},
}
outCh <- sd
ringBufferOutCh <- sd
// Just to confirm here that the message was picked up, to know if the
// the read process have stalled or not.
// For now it will not do anything,
@ -331,7 +331,6 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
}(samDBv)
case <-ctx.Done():
//close(outCh)
return
}
}