2021-02-12 10:21:51 +00:00
// Info: The idea about the ring buffer is that we have a FIFO
// buffer where we store all incomming messages requested by
2021-08-16 11:01:12 +00:00
// operators.
// Each message in process or waiting to be processed will be
// stored in a DB. When the processing of a given message is
// done it will be removed from the state db, and an entry will
// made in the persistent message log.
2021-02-12 10:21:51 +00:00
package steward
2021-02-15 10:28:27 +00:00
import (
2021-09-15 06:39:34 +00:00
"context"
2021-02-16 03:57:54 +00:00
"encoding/json"
2021-02-15 10:28:27 +00:00
"fmt"
"log"
2021-02-17 09:27:39 +00:00
"os"
2021-05-12 07:50:03 +00:00
"path/filepath"
2021-02-17 11:02:34 +00:00
"sort"
2021-02-15 10:28:27 +00:00
"strconv"
2021-02-16 13:29:32 +00:00
"sync"
2021-02-17 09:27:39 +00:00
"time"
2021-02-15 10:28:27 +00:00
2022-05-16 05:15:38 +00:00
copier "github.com/jinzhu/copier"
2021-02-15 10:28:27 +00:00
bolt "go.etcd.io/bbolt"
)
// samValue represents one message with a subject. This
// struct type is used when storing and retreiving from
// db.
type samDBValue struct {
2022-12-31 06:51:34 +00:00
ID int
SAM subjectAndMessage
2021-02-15 10:28:27 +00:00
}
2021-02-12 10:21:51 +00:00
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
2021-08-16 11:01:12 +00:00
// In memory buffer for the messages.
bufData chan samDBValue
// The database to use.
2021-09-14 14:23:01 +00:00
db * bolt . DB
samValueBucket string
indexValueBucket string
2021-08-16 11:01:12 +00:00
// The current number of items in the database.
2021-02-16 03:57:54 +00:00
totalMessagesIndex int
2021-02-16 13:29:32 +00:00
mu sync . Mutex
2021-08-16 11:01:12 +00:00
// The channel to send messages that have been processed,
// and we want to store it in the permanent message log.
permStore chan string
// Name of node.
nodeName Node
2022-01-20 07:17:37 +00:00
// ringBufferBulkInCh from *server are also implemented here,
2021-08-25 06:31:48 +00:00
// so the ringbuffer can send it's error messages the same
// way as all messages are handled.
2022-01-20 07:17:37 +00:00
ringBufferBulkInCh chan [ ] subjectAndMessage
metrics * metrics
configuration * Configuration
errorKernel * errorKernel
processInitial process
2021-02-12 10:21:51 +00:00
}
2021-08-16 11:01:12 +00:00
// newringBuffer returns a push/pop storage for values.
2022-01-20 07:17:37 +00:00
func newringBuffer ( ctx context . Context , metrics * metrics , configuration * Configuration , size int , dbFileName string , nodeName Node , ringBufferBulkInCh chan [ ] subjectAndMessage , samValueBucket string , indexValueBucket string , errorKernel * errorKernel , processInitial process ) * ringBuffer {
2021-09-15 06:39:34 +00:00
2022-06-21 14:44:39 +00:00
r := ringBuffer { }
2021-05-12 07:50:03 +00:00
// Check if socket folder exists, if not create it
2021-09-07 07:43:54 +00:00
if _ , err := os . Stat ( configuration . DatabaseFolder ) ; os . IsNotExist ( err ) {
2023-01-10 05:50:28 +00:00
err := os . MkdirAll ( configuration . DatabaseFolder , 0770 )
2021-05-12 07:50:03 +00:00
if err != nil {
2021-09-07 07:43:54 +00:00
log . Printf ( "error: failed to create database directory %v: %v\n" , configuration . DatabaseFolder , err )
2021-05-12 07:50:03 +00:00
os . Exit ( 1 )
}
}
2021-09-07 07:43:54 +00:00
DatabaseFilepath := filepath . Join ( configuration . DatabaseFolder , dbFileName )
2021-05-12 07:50:03 +00:00
// ---
2022-06-21 14:44:39 +00:00
var db * bolt . DB
if configuration . RingBufferPersistStore {
var err error
2023-01-10 05:50:28 +00:00
db , err = bolt . Open ( DatabaseFilepath , 0660 , nil )
2022-06-21 14:44:39 +00:00
if err != nil {
log . Printf ( "error: failed to open db: %v\n" , err )
os . Exit ( 1 )
}
2021-02-15 10:28:27 +00:00
}
2021-08-18 13:41:53 +00:00
2022-06-21 14:44:39 +00:00
r . bufData = make ( chan samDBValue , size )
r . db = db
r . samValueBucket = samValueBucket
r . indexValueBucket = indexValueBucket
r . permStore = make ( chan string )
r . nodeName = nodeName
r . ringBufferBulkInCh = ringBufferBulkInCh
r . metrics = metrics
r . configuration = configuration
r . processInitial = processInitial
return & r
2021-02-12 10:21:51 +00:00
}
// start will process incomming messages through the inCh,
2021-02-15 10:28:27 +00:00
// put the messages on a buffered channel
2021-02-12 10:21:51 +00:00
// and deliver messages out when requested on the outCh.
2023-04-26 06:20:55 +00:00
func ( r * ringBuffer ) start ( ctx context . Context , ringBufferInCh chan subjectAndMessage , ringBufferOutCh chan samDBValueAndDelivered ) {
2021-03-12 08:38:19 +00:00
2021-02-12 10:21:51 +00:00
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
2022-06-21 14:44:39 +00:00
r . totalMessagesIndex = 0
if r . configuration . RingBufferPersistStore {
r . totalMessagesIndex = r . getIndexValue ( )
}
2021-02-15 10:28:27 +00:00
2021-02-17 09:27:39 +00:00
// Fill the buffer when new data arrives into the system
2023-04-26 06:20:55 +00:00
go r . fillBuffer ( ctx , ringBufferInCh )
2021-02-15 10:28:27 +00:00
2021-02-17 09:27:39 +00:00
// Start the process to permanently store done messages.
2021-09-15 09:39:19 +00:00
go r . startPermanentStore ( ctx )
2021-02-17 09:27:39 +00:00
// Start the process that will handle messages present in the ringbuffer.
2023-04-26 06:20:55 +00:00
go r . processBufferMessages ( ctx , ringBufferOutCh )
2021-09-15 05:26:36 +00:00
go func ( ) {
ticker := time . NewTicker ( time . Second * 5 )
2022-12-26 09:52:43 +00:00
defer ticker . Stop ( )
2021-09-15 05:26:36 +00:00
for {
select {
case <- ticker . C :
2022-06-21 14:44:39 +00:00
if r . configuration . RingBufferPersistStore {
r . dbUpdateMetrics ( r . samValueBucket )
}
2021-09-15 08:46:30 +00:00
case <- ctx . Done ( ) :
2021-09-15 06:39:34 +00:00
return
2021-09-15 05:26:36 +00:00
}
}
} ( )
2021-02-16 11:59:37 +00:00
}
2021-02-15 10:28:27 +00:00
2021-02-16 11:59:37 +00:00
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
2023-04-26 06:20:55 +00:00
func ( r * ringBuffer ) fillBuffer ( ctx context . Context , ringBufferInCh chan subjectAndMessage ) {
2021-02-17 11:02:34 +00:00
// At startup get all the values that might be in the K/V store so we can
// put them into the buffer before we start to fill up with new incomming
// messages to the system.
// This is needed when the program have been restarted, and we need to check
// if there where previously unhandled messages that need to be handled first.
2021-05-21 06:21:17 +00:00
2022-06-21 14:44:39 +00:00
if r . configuration . RingBufferPersistStore {
func ( ) {
s , err := r . dumpBucket ( r . samValueBucket )
if err != nil {
er := fmt . Errorf ( "info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v" , err )
log . Printf ( "%v\n" , er )
return
}
2021-02-17 11:02:34 +00:00
2022-06-21 14:44:39 +00:00
for _ , v := range s {
r . bufData <- v
}
} ( )
}
2021-02-17 11:02:34 +00:00
2021-02-16 11:59:37 +00:00
// Check for incomming messages. These are typically comming from
2021-08-16 11:01:12 +00:00
// the go routine who reads the socket.
2021-09-15 09:39:19 +00:00
for {
select {
2023-04-26 06:20:55 +00:00
case sam := <- ringBufferInCh :
2021-02-18 07:25:13 +00:00
2021-11-18 05:50:25 +00:00
// Check if default message values for timers are set, and if
// not then set default message values.
2023-01-04 21:07:53 +00:00
if sam . Message . ACKTimeout < 1 {
2023-01-05 00:55:52 +00:00
sam . Subject . Event = EventNACK
2021-09-15 09:39:19 +00:00
}
2023-01-05 00:55:52 +00:00
if sam . Message . ACKTimeout >= 1 {
sam . Subject . Event = EventNACK
}
2023-04-26 05:25:14 +00:00
switch {
case sam . Message . Retries < 0 :
2023-01-04 21:07:53 +00:00
sam . Message . Retries = r . configuration . DefaultMessageRetries
2021-09-15 09:39:19 +00:00
}
2023-01-04 21:07:53 +00:00
if sam . Message . MethodTimeout < 1 && sam . Message . MethodTimeout != - 1 {
sam . Message . MethodTimeout = r . configuration . DefaultMethodTimeout
2021-11-18 05:36:16 +00:00
}
2021-02-18 07:25:13 +00:00
2021-09-15 09:39:19 +00:00
// --- Store the incomming message in the k/v store ---
2021-02-25 12:08:10 +00:00
2021-09-15 09:39:19 +00:00
// Get a unique number for the message to use when storing
// it in the databases, and also use when further processing.
r . mu . Lock ( )
dbID := r . totalMessagesIndex
r . mu . Unlock ( )
2021-02-16 11:59:37 +00:00
2021-09-15 09:39:19 +00:00
// Create a structure for JSON marshaling.
samV := samDBValue {
2022-12-31 06:51:34 +00:00
ID : dbID ,
2023-01-04 21:07:53 +00:00
SAM : sam ,
2021-09-15 09:39:19 +00:00
}
2021-02-16 13:29:32 +00:00
2022-06-21 14:44:39 +00:00
if r . configuration . RingBufferPersistStore {
js , err := json . Marshal ( samV )
if err != nil {
er := fmt . Errorf ( "error:fillBuffer: json marshaling: %v" , err )
2023-01-11 07:38:15 +00:00
r . errorKernel . errSend ( r . processInitial , Message { } , er , logError )
2022-06-21 14:44:39 +00:00
}
2021-02-15 10:28:27 +00:00
2022-06-21 14:44:39 +00:00
// Store the incomming message in key/value store
err = r . dbUpdate ( r . db , r . samValueBucket , strconv . Itoa ( dbID ) , js )
if err != nil {
er := fmt . Errorf ( "error: dbUpdate samValue failed: %v" , err )
2023-01-11 07:38:15 +00:00
r . errorKernel . errSend ( r . processInitial , Message { } , er , logError )
2022-06-21 14:44:39 +00:00
}
2021-09-15 09:39:19 +00:00
}
2021-03-12 08:38:19 +00:00
2021-09-15 09:39:19 +00:00
// Put the message on the inmemory buffer.
r . bufData <- samV
// Increment index, and store the new value to the database.
r . mu . Lock ( )
r . totalMessagesIndex ++
2022-06-21 14:44:39 +00:00
if r . configuration . RingBufferPersistStore {
r . dbUpdate ( r . db , r . indexValueBucket , "index" , [ ] byte ( strconv . Itoa ( r . totalMessagesIndex ) ) )
}
2021-09-15 09:39:19 +00:00
r . mu . Unlock ( )
case <- ctx . Done ( ) :
// When done close the buffer channel
close ( r . bufData )
return
2021-02-12 10:21:51 +00:00
}
2021-02-15 10:28:27 +00:00
2021-02-16 11:59:37 +00:00
}
}
2021-02-16 11:29:15 +00:00
2021-02-16 11:59:37 +00:00
// processBufferMessages will pick messages from the buffer, and process them
// one by one. The messages will be delivered on the outCh, and it will wait
// until a signal is received on the done channel before it continues with the
// next message.
2023-04-26 06:20:55 +00:00
func ( r * ringBuffer ) processBufferMessages ( ctx context . Context , ringBufferOutCh chan samDBValueAndDelivered ) {
2021-02-16 11:59:37 +00:00
// Range over the buffer of messages to pass on to processes.
2021-09-15 09:39:19 +00:00
for {
select {
2022-12-31 06:51:34 +00:00
case samDBv := <- r . bufData :
2021-09-15 09:39:19 +00:00
r . metrics . promInMemoryBufferMessagesCurrent . Set ( float64 ( len ( r . bufData ) ) )
2022-12-31 06:51:34 +00:00
samDBv . SAM . ID = samDBv . ID
2021-09-15 09:39:19 +00:00
2023-01-05 00:55:52 +00:00
// // Create a done channel per message. A process started by the
// // spawnProcess function will handle incomming messages sequentaly.
// // So in the spawnProcess function we put a struct{} value when a
// // message is processed on the "done" channel and an ack is received
// // for a message, and we wait here for the "done" to be received.
2021-09-15 09:39:19 +00:00
// We start the actual processing of an individual message here within
// it's own go routine. Reason is that we don't want to block other
// messages to be processed while waiting for the done signal, or if an
// error with an individual message occurs.
go func ( v samDBValue ) {
2022-05-16 05:15:38 +00:00
// Create a copy of the message that we can use to write to the
// perm store without causing a race since the REQ handler for the
// message might not yet be done when message is written to the
// perm store.
// We also need a copy to be able to remove the data from the message
// when writing it to the store, so we don't mess up to actual data
// that might be in use in the handler.
msgForPermStore := Message { }
2022-12-31 06:51:34 +00:00
copier . Copy ( & msgForPermStore , v . SAM . Message )
2022-05-16 05:15:38 +00:00
// Remove the content of the data field.
msgForPermStore . Data = nil
2022-12-31 06:51:34 +00:00
v . SAM . Message . done = make ( chan struct { } )
2021-09-15 09:39:19 +00:00
delivredCh := make ( chan struct { } )
// Prepare the structure with the data, and a function that can
// be called when the data is received for signaling back.
sd := samDBValueAndDelivered {
samDBValue : v ,
delivered : func ( ) {
delivredCh <- struct { } { }
} ,
}
2023-05-30 03:46:02 +00:00
// Create a ticker that will kick in when a message have been in the
// system for it's maximum time. This will allow us to continue, and
// remove the message if it takes longer than it should to get delivered.
2023-06-01 04:15:39 +00:00
fmt . Printf ( "DEBUG:::%v\n" , v . SAM . ACKTimeout )
if v . SAM . ACKTimeout <= 0 {
v . SAM . ACKTimeout = 1
}
if v . SAM . Retries <= 0 {
v . SAM . Retries = 1
}
2023-05-30 03:46:02 +00:00
ticker := time . NewTicker ( time . Duration ( v . SAM . ACKTimeout ) * time . Duration ( v . SAM . Retries ) * time . Second )
defer ticker . Stop ( )
2023-04-26 06:20:55 +00:00
ringBufferOutCh <- sd
2021-09-15 09:39:19 +00:00
// Just to confirm here that the message was picked up, to know if the
// the read process have stalled or not.
// For now it will not do anything,
select {
case <- delivredCh :
// OK.
case <- time . After ( time . Second * 5 ) :
2022-06-22 12:50:26 +00:00
// TODO: Check if more logic should be made here if messages are stuck etc.
2021-09-15 09:39:19 +00:00
// Testing with a timeout here to figure out if messages are stuck
// waiting for done signal.
2022-12-30 15:52:50 +00:00
log . Printf ( "Error: ringBuffer: message %v seems to be stuck, did not receive delivered signal from reading process\n" , v . ID )
2021-09-15 09:39:19 +00:00
r . metrics . promRingbufferStalledMessagesTotal . Inc ( )
}
// Listen on the done channel here , so a go routine handling the
// message will be able to signal back here that the message have
// been processed, and that we then can delete it out of the K/V Store.
2023-04-26 05:25:14 +00:00
// The publishAMessage method should send a done back here, but in some situations
2022-12-30 15:52:50 +00:00
// it seems that that do not happen. Implementing a ticker that is twice the total
// amount of time a message should be allowed to be using for getting published so
// we don't get stuck go routines here.
//
2023-05-30 03:46:02 +00:00
if r . configuration . RingBufferPersistStore {
select {
case <- v . SAM . done :
fmt . Printf ( "---\n DONE with\n---\n" )
case <- ticker . C :
log . Printf ( "----------------------------------------------\n" )
log . Printf ( "Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n" , v . SAM . ID , v . SAM . Subject )
log . Printf ( "----------------------------------------------\n" )
}
}
2021-12-31 05:59:09 +00:00
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
2021-11-09 13:01:42 +00:00
r . metrics . promMessagesProcessedIDLast . Set ( float64 ( v . ID ) )
2021-09-15 09:39:19 +00:00
// Since we are now done with the specific message we can delete
// it out of the K/V Store.
2022-06-21 14:44:39 +00:00
if r . configuration . RingBufferPersistStore {
r . deleteKeyFromBucket ( r . samValueBucket , strconv . Itoa ( v . ID ) )
}
2021-09-15 09:39:19 +00:00
2022-05-16 05:15:38 +00:00
js , err := json . Marshal ( msgForPermStore )
if err != nil {
er := fmt . Errorf ( "error:fillBuffer: json marshaling: %v" , err )
2023-01-11 07:38:15 +00:00
r . errorKernel . errSend ( r . processInitial , Message { } , er , logError )
2022-05-16 05:15:38 +00:00
}
r . permStore <- time . Now ( ) . Format ( "Mon Jan _2 15:04:05 2006" ) + ", " + string ( js ) + "\n"
2021-09-15 09:39:19 +00:00
2022-12-31 06:51:34 +00:00
} ( samDBv )
2021-09-15 09:39:19 +00:00
case <- ctx . Done ( ) :
return
}
2021-02-16 11:59:37 +00:00
}
2021-02-12 10:21:51 +00:00
}
2021-02-15 10:28:27 +00:00
2021-02-17 11:02:34 +00:00
// dumpBucket will dump out all they keys and values in the
// specified bucket, and return a sorted []samDBValue
func ( r * ringBuffer ) dumpBucket ( bucket string ) ( [ ] samDBValue , error ) {
samDBValues := [ ] samDBValue { }
err := r . db . View ( func ( tx * bolt . Tx ) error {
bu := tx . Bucket ( [ ] byte ( bucket ) )
2021-02-17 17:59:49 +00:00
if bu == nil {
2022-06-27 10:22:59 +00:00
return fmt . Errorf ( "error: ringBuffer.dumpBucket: tx.bucket returned nil" )
2021-02-17 17:59:49 +00:00
}
2021-02-17 11:02:34 +00:00
2022-06-27 10:22:59 +00:00
type kv struct {
key [ ] byte
value [ ] byte
}
dbkvs := [ ] kv { }
// Get all the items from the db.
err := bu . ForEach ( func ( k , v [ ] byte ) error {
va := kv {
key : k ,
value : v ,
2021-02-17 11:02:34 +00:00
}
2022-06-27 10:22:59 +00:00
dbkvs = append ( dbkvs , va )
2021-02-17 11:02:34 +00:00
return nil
} )
2022-06-27 10:22:59 +00:00
if err != nil {
// Todo: what to return here ?
log . Fatalf ( "error: ringBuffer: ranging db failed: %v" , err )
}
// Range all the values we got from the db, unmarshal each value.
// If the unmarshaling is ok we put it on the samDBValues slice,
// if it fails the value is not of correct format so we we delete
// it from the db, and loop to work on the next value.
for _ , dbkv := range dbkvs {
var sdbv samDBValue
err := json . Unmarshal ( dbkv . value , & sdbv )
if err != nil {
// If we're unable to unmarshal the value it value of wrong format,
// so we log it, and delete the value from the bucker.
log . Printf ( "error: ringBuffer.dumpBucket json.Umarshal failed: %v\n" , err )
r . deleteKeyFromBucket ( r . samValueBucket , string ( dbkv . key ) )
continue
}
samDBValues = append ( samDBValues , sdbv )
}
// TODO:
2022-11-30 07:35:13 +00:00
// BoltDB do not automatically shrink in filesize. We should delete the db, and create a new one to shrink the size.
2022-06-27 10:22:59 +00:00
2021-02-17 11:02:34 +00:00
// Sort the order of the slice items based on ID, since they where retreived from a map.
sort . SliceStable ( samDBValues , func ( i , j int ) bool {
return samDBValues [ i ] . ID > samDBValues [ j ] . ID
} )
2022-12-31 06:51:34 +00:00
for _ , samDBv := range samDBValues {
log . Printf ( "info: ringBuffer.dumpBucket: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n" , samDBv . ID , samDBv . SAM . ID , samDBv . SAM . Subject , len ( samDBv . SAM . Data ) )
2021-02-17 11:02:34 +00:00
}
2021-04-16 11:43:58 +00:00
2021-02-17 11:02:34 +00:00
return nil
} )
2021-02-17 17:59:49 +00:00
if err != nil {
return nil , err
}
2021-02-17 11:02:34 +00:00
return samDBValues , err
}
2021-11-09 13:01:42 +00:00
// // printBucketContent will print out all they keys and values in the
// // specified bucket.
// func (r *ringBuffer) printBucketContent(bucket string) error {
// err := r.db.View(func(tx *bolt.Tx) error {
// bu := tx.Bucket([]byte(bucket))
//
// bu.ForEach(func(k, v []byte) error {
// var vv samDBValue
// err := json.Unmarshal(v, &vv)
// if err != nil {
// log.Printf("error: printBucketContent json.Umarshal failed: %v\n", err)
// }
// log.Printf("k: %s, v: %v\n", k, vv)
// return nil
// })
//
// return nil
// })
//
// return err
// }
2021-02-16 13:29:32 +00:00
2021-02-16 13:58:59 +00:00
// deleteKeyFromBucket will delete the specified key from the specified
// bucket if it exists.
2021-02-16 13:29:32 +00:00
func ( r * ringBuffer ) deleteKeyFromBucket ( bucket string , key string ) error {
err := r . db . Update ( func ( tx * bolt . Tx ) error {
bu := tx . Bucket ( [ ] byte ( bucket ) )
err := bu . Delete ( [ ] byte ( key ) )
if err != nil {
log . Printf ( "error: delete key in bucket %v failed: %v\n" , bucket , err )
}
2021-02-16 05:43:09 +00:00
return nil
} )
return err
}
2021-09-15 05:26:36 +00:00
// db update metrics.
func ( r * ringBuffer ) dbUpdateMetrics ( bucket string ) error {
err := r . db . Update ( func ( tx * bolt . Tx ) error {
bu := tx . Bucket ( [ ] byte ( bucket ) )
r . metrics . promDBMessagesCurrent . Set ( float64 ( bu . Stats ( ) . KeyN ) )
return nil
} )
return err
}
2021-02-16 13:58:59 +00:00
// getIndexValue will get the last index value stored in DB.
2021-09-14 14:23:01 +00:00
func ( r * ringBuffer ) getIndexValue ( ) int {
2021-02-16 03:57:54 +00:00
const indexKey string = "index"
2021-09-14 14:23:01 +00:00
indexB , err := r . dbView ( r . db , r . indexValueBucket , indexKey )
2021-02-16 03:57:54 +00:00
if err != nil {
log . Printf ( "error: getIndexValue: dbView: %v\n" , err )
}
index , err := strconv . Atoi ( string ( indexB ) )
2021-08-09 07:34:05 +00:00
if err != nil && string ( indexB ) == "" {
2021-08-23 09:53:47 +00:00
log . Printf ( "info: getIndexValue: no index value found, probaly empty database, and no previous entries in db to process : %v\n" , err )
2021-02-16 03:57:54 +00:00
}
return index
}
2021-08-16 11:01:12 +00:00
// dbView will look up and return a specific value if it exists for a key in a bucket in a DB.
2021-02-15 10:28:27 +00:00
func ( r * ringBuffer ) dbView ( db * bolt . DB , bucket string , key string ) ( [ ] byte , error ) {
var value [ ] byte
2021-08-16 11:01:12 +00:00
// View is a help function to get values out of the database.
2021-02-15 10:28:27 +00:00
err := db . View ( func ( tx * bolt . Tx ) error {
//Open a bucket to get key's and values from.
bu := tx . Bucket ( [ ] byte ( bucket ) )
2021-02-16 03:57:54 +00:00
if bu == nil {
2021-08-23 09:53:47 +00:00
log . Printf ( "info: no db bucket exist: %v\n" , bucket )
2021-02-16 03:57:54 +00:00
return nil
}
2021-02-15 10:28:27 +00:00
v := bu . Get ( [ ] byte ( key ) )
if len ( v ) == 0 {
2021-02-16 03:57:54 +00:00
log . Printf ( "info: view: key not found\n" )
return nil
2021-02-15 10:28:27 +00:00
}
value = v
return nil
} )
return value , err
}
2022-11-30 07:35:13 +00:00
// dbUpdate will update the specified bucket with a key and value.
2021-02-15 10:28:27 +00:00
func ( r * ringBuffer ) dbUpdate ( db * bolt . DB , bucket string , key string , value [ ] byte ) error {
err := db . Update ( func ( tx * bolt . Tx ) error {
//Create a bucket
bu , err := tx . CreateBucketIfNotExists ( [ ] byte ( bucket ) )
if err != nil {
return fmt . Errorf ( "error: CreateBuckerIfNotExists failed: %v" , err )
}
//Put a value into the bucket.
if err := bu . Put ( [ ] byte ( key ) , [ ] byte ( value ) ) ; err != nil {
return err
}
//If all was ok, we should return a nil for a commit to happen. Any error
// returned will do a rollback.
return nil
} )
return err
}
2021-02-17 09:27:39 +00:00
// startPermStore will start the process that will handle writing of
// handled message to a permanent file.
// To store a message in the store, send what to store on the
// ringbuffer.permStore channel.
2021-09-15 09:39:19 +00:00
func ( r * ringBuffer ) startPermanentStore ( ctx context . Context ) {
2023-01-06 11:19:36 +00:00
storeFile := filepath . Join ( r . configuration . DatabaseFolder , "store.log" )
2023-01-10 05:50:28 +00:00
f , err := os . OpenFile ( storeFile , os . O_APPEND | os . O_RDWR | os . O_CREATE , 0660 )
2021-02-17 09:27:39 +00:00
if err != nil {
2021-03-02 12:46:02 +00:00
log . Printf ( "error: startPermanentStore: failed to open file: %v\n" , err )
2021-02-17 09:27:39 +00:00
}
defer f . Close ( )
for {
2021-09-15 09:39:19 +00:00
select {
case d := <- r . permStore :
_ , err := f . WriteString ( d )
if err != nil {
log . Printf ( "error:failed to write entry: %v\n" , err )
}
case <- ctx . Done ( ) :
return
2021-02-17 09:27:39 +00:00
}
}
}