1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-04-09 10:24:17 +00:00

working on context for ringbuffer

This commit is contained in:
postmannen 2021-09-15 08:39:34 +02:00
parent b83749cbc0
commit 64207adc62
2 changed files with 28 additions and 2 deletions

View file

@ -9,6 +9,7 @@
package steward
import (
"context"
"encoding/json"
"fmt"
"log"
@ -32,6 +33,12 @@ type samDBValue struct {
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
// Context for ring buffer.
ctx context.Context
// Cancel function for ring buffer.
cancel context.CancelFunc
// Waitgroup for ringbuffer.
wg sync.WaitGroup
// In memory buffer for the messages.
bufData chan samDBValue
// The database to use.
@ -55,7 +62,9 @@ type ringBuffer struct {
}
// newringBuffer returns a push/pop storage for values.
func newringBuffer(metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string) *ringBuffer {
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, newMessagesCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string) *ringBuffer {
ctxRingbuffer, cancel := context.WithCancel(ctx)
// Check if socket folder exists, if not create it
if _, err := os.Stat(configuration.DatabaseFolder); os.IsNotExist(err) {
err := os.MkdirAll(configuration.DatabaseFolder, 0700)
@ -76,6 +85,8 @@ func newringBuffer(metrics *metrics, configuration *Configuration, size int, dbF
}
return &ringBuffer{
ctx: ctxRingbuffer,
cancel: cancel,
bufData: make(chan samDBValue, size),
db: db,
samValueBucket: samValueBucket,
@ -107,6 +118,7 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValueAnd
// Start the process that will handle messages present in the ringbuffer.
go r.processBufferMessages(outCh)
r.wg.Add(1)
go func() {
ticker := time.NewTicker(time.Second * 5)
@ -114,11 +126,19 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValueAnd
select {
case <-ticker.C:
r.dbUpdateMetrics(r.samValueBucket)
case <-r.ctx.Done():
r.wg.Done()
return
}
}
}()
}
func (r *ringBuffer) stop() {
r.cancel()
r.wg.Wait()
}
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, defaultMessageTimeout int, defaultMessageRetries int) {

View file

@ -48,6 +48,8 @@ type server struct {
// errorKernel is doing all the error handling like what to do if
// an error occurs.
errorKernel *errorKernel
// Ring buffer
ringBuffer *ringBuffer
// metric exporter
metrics *metrics
// Version of package
@ -252,6 +254,8 @@ func (s *server) Start() {
}
// Start the processing of new messages from an input channel.
// NB: We might need to create a sub context for the ringbuffer here
// so we can cancel this context last, and not use the server.
s.routeMessagesToProcess("./incomingBuffer.db")
}
@ -273,6 +277,8 @@ func (s *server) Stop() {
s.cancel()
log.Printf("info: stopped the main context\n")
// Stop the ringbuffer.
// Delete the socket file when the program exits.
socketFilepath := filepath.Join(s.configuration.SocketFolder, "steward.sock")
@ -343,7 +349,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
const samValueBucket string = "samValueBucket"
const indexValueBucket string = "indexValueBucket"
rb := newringBuffer(s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.newMessagesCh, samValueBucket, indexValueBucket)
rb := newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.newMessagesCh, samValueBucket, indexValueBucket)
ringBufferInCh := make(chan subjectAndMessage)
ringBufferOutCh := make(chan samDBValueAndDelivered)