1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

added feature to turn of message persist to disk

This commit is contained in:
postmannen 2022-06-21 16:44:39 +02:00
parent 93a61d8ef3
commit 9c6ede458a
2 changed files with 73 additions and 43 deletions

View file

@ -17,6 +17,9 @@ import (
// an if check should be added to the checkConfigValues function
// to set default values when reading from config file.
type Configuration struct {
// RingBufferPermStore enable or disable the persisting of
// messages being processed to local db.
RingBufferPersistStore bool
// RingBufferSize
RingBufferSize int
// The configuration folder on disk
@ -139,6 +142,7 @@ type Configuration struct {
// if a value were given or not when parsing.
type ConfigurationFromFile struct {
ConfigFolder *string
RingBufferPersistStore *bool
RingBufferSize *int
SocketFolder *string
TCPListener *string
@ -205,6 +209,7 @@ func NewConfiguration() *Configuration {
func newConfigurationDefaults() Configuration {
c := Configuration{
ConfigFolder: "./etc/",
RingBufferPersistStore: true,
RingBufferSize: 1000,
SocketFolder: "./tmp",
TCPListener: "",
@ -274,6 +279,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else {
conf.RingBufferSize = *cf.RingBufferSize
}
if cf.RingBufferPersistStore == nil {
conf.RingBufferPersistStore = cd.RingBufferPersistStore
} else {
conf.RingBufferPersistStore = *cf.RingBufferPersistStore
}
if cf.ConfigFolder == nil {
conf.ConfigFolder = cd.ConfigFolder
} else {
@ -577,6 +587,7 @@ func (c *Configuration) CheckFlags() error {
*c = fc
//flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/steward/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER")
flag.BoolVar(&c.RingBufferPersistStore, "ringBufferPersistStore", fc.RingBufferPersistStore, "true/false for enabling the persisting of ringbuffer to disk")
flag.IntVar(&c.RingBufferSize, "ringBufferSize", fc.RingBufferSize, "size of the ringbuffer")
flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.")
flag.StringVar(&c.TCPListener, "tcpListener", fc.TCPListener, "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost")

View file

@ -61,6 +61,9 @@ type ringBuffer struct {
// newringBuffer returns a push/pop storage for values.
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, ringBufferBulkInCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer {
fmt.Printf(" * DEBUG: configuration: %+v\n", configuration)
r := ringBuffer{}
// Check if socket folder exists, if not create it
if _, err := os.Stat(configuration.DatabaseFolder); os.IsNotExist(err) {
err := os.MkdirAll(configuration.DatabaseFolder, 0700)
@ -73,25 +76,29 @@ func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configu
DatabaseFilepath := filepath.Join(configuration.DatabaseFolder, dbFileName)
// ---
db, err := bolt.Open(DatabaseFilepath, 0600, nil)
if err != nil {
log.Printf("error: failed to open db: %v\n", err)
os.Exit(1)
var db *bolt.DB
fmt.Printf(" * DEBUG: configuration: %+v\n", configuration)
if configuration.RingBufferPersistStore {
var err error
db, err = bolt.Open(DatabaseFilepath, 0600, nil)
if err != nil {
log.Printf("error: failed to open db: %v\n", err)
os.Exit(1)
}
}
return &ringBuffer{
bufData: make(chan samDBValue, size),
db: db,
samValueBucket: samValueBucket,
indexValueBucket: indexValueBucket,
permStore: make(chan string),
nodeName: nodeName,
ringBufferBulkInCh: ringBufferBulkInCh,
metrics: metrics,
configuration: configuration,
processInitial: processInitial,
}
r.bufData = make(chan samDBValue, size)
r.db = db
r.samValueBucket = samValueBucket
r.indexValueBucket = indexValueBucket
r.permStore = make(chan string)
r.nodeName = nodeName
r.ringBufferBulkInCh = ringBufferBulkInCh
r.metrics = metrics
r.configuration = configuration
r.processInitial = processInitial
return &r
}
// start will process incomming messages through the inCh,
@ -102,7 +109,10 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
r.totalMessagesIndex = r.getIndexValue()
r.totalMessagesIndex = 0
if r.configuration.RingBufferPersistStore {
r.totalMessagesIndex = r.getIndexValue()
}
// Fill the buffer when new data arrives into the system
go r.fillBuffer(ctx, inCh)
@ -119,7 +129,9 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
for {
select {
case <-ticker.C:
r.dbUpdateMetrics(r.samValueBucket)
if r.configuration.RingBufferPersistStore {
r.dbUpdateMetrics(r.samValueBucket)
}
case <-ctx.Done():
return
}
@ -136,18 +148,20 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
// This is needed when the program have been restarted, and we need to check
// if there where previously unhandled messages that need to be handled first.
func() {
s, err := r.dumpBucket(r.samValueBucket)
if err != nil {
er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v", err)
log.Printf("%v\n", er)
return
}
if r.configuration.RingBufferPersistStore {
func() {
s, err := r.dumpBucket(r.samValueBucket)
if err != nil {
er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v", err)
log.Printf("%v\n", er)
return
}
for _, v := range s {
r.bufData <- v
}
}()
for _, v := range s {
r.bufData <- v
}
}()
}
// Prepare the map structure to know what values are allowed
// for the events
@ -199,18 +213,19 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
Data: v,
}
js, err := json.Marshal(samV)
if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er)
}
// Store the incomming message in key/value store
err = r.dbUpdate(r.db, r.samValueBucket, strconv.Itoa(dbID), js)
if err != nil {
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er)
if r.configuration.RingBufferPersistStore {
js, err := json.Marshal(samV)
if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er)
}
// Store the incomming message in key/value store
err = r.dbUpdate(r.db, r.samValueBucket, strconv.Itoa(dbID), js)
if err != nil {
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er)
}
}
// Put the message on the inmemory buffer.
@ -219,7 +234,9 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
// Increment index, and store the new value to the database.
r.mu.Lock()
r.totalMessagesIndex++
r.dbUpdate(r.db, r.indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
if r.configuration.RingBufferPersistStore {
r.dbUpdate(r.db, r.indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
}
r.mu.Unlock()
case <-ctx.Done():
// When done close the buffer channel
@ -302,7 +319,9 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// Since we are now done with the specific message we can delete
// it out of the K/V Store.
r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID))
if r.configuration.RingBufferPersistStore {
r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID))
}
js, err := json.Marshal(msgForPermStore)
if err != nil {