1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00
ctrl/ringbuffer.go

570 lines
18 KiB
Go

// Info: The idea about the ring buffer is that we have a FIFO
// buffer where we store all incomming messages requested by
// operators.
// Each message in process or waiting to be processed will be
// stored in a DB. When the processing of a given message is
// done it will be removed from the state db, and an entry will
// made in the persistent message log.
package steward
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
copier "github.com/jinzhu/copier"
bolt "go.etcd.io/bbolt"
)
// samValue represents one message with a subject. This
// struct type is used when storing and retreiving from
// db.
type samDBValue struct {
ID int
SAM subjectAndMessage
}
// ringBuffer holds the data of the buffer,
type ringBuffer struct {
// In memory buffer for the messages.
bufData chan samDBValue
// The database to use.
db *bolt.DB
samValueBucket string
indexValueBucket string
// The current number of items in the database.
totalMessagesIndex int
mu sync.Mutex
// The channel to send messages that have been processed,
// and we want to store it in the permanent message log.
permStore chan string
// Name of node.
nodeName Node
// ringBufferBulkInCh from *server are also implemented here,
// so the ringbuffer can send it's error messages the same
// way as all messages are handled.
ringBufferBulkInCh chan []subjectAndMessage
metrics *metrics
configuration *Configuration
errorKernel *errorKernel
processInitial process
}
// newringBuffer returns a push/pop storage for values.
func newringBuffer(ctx context.Context, metrics *metrics, configuration *Configuration, size int, dbFileName string, nodeName Node, ringBufferBulkInCh chan []subjectAndMessage, samValueBucket string, indexValueBucket string, errorKernel *errorKernel, processInitial process) *ringBuffer {
r := ringBuffer{}
// Check if socket folder exists, if not create it
if _, err := os.Stat(configuration.DatabaseFolder); os.IsNotExist(err) {
err := os.MkdirAll(configuration.DatabaseFolder, 0770)
if err != nil {
log.Printf("error: failed to create database directory %v: %v\n", configuration.DatabaseFolder, err)
os.Exit(1)
}
}
DatabaseFilepath := filepath.Join(configuration.DatabaseFolder, dbFileName)
// ---
var db *bolt.DB
if configuration.RingBufferPersistStore {
var err error
db, err = bolt.Open(DatabaseFilepath, 0660, nil)
if err != nil {
log.Printf("error: failed to open db: %v\n", err)
os.Exit(1)
}
}
r.bufData = make(chan samDBValue, size)
r.db = db
r.samValueBucket = samValueBucket
r.indexValueBucket = indexValueBucket
r.permStore = make(chan string)
r.nodeName = nodeName
r.ringBufferBulkInCh = ringBufferBulkInCh
r.metrics = metrics
r.configuration = configuration
r.processInitial = processInitial
return &r
}
// start will process incomming messages through the inCh,
// put the messages on a buffered channel
// and deliver messages out when requested on the outCh.
func (r *ringBuffer) start(ctx context.Context, ringBufferInCh chan subjectAndMessage, ringBufferOutCh chan samDBValueAndDelivered) {
// Starting both writing and reading in separate go routines so we
// can write and read concurrently.
r.totalMessagesIndex = 0
if r.configuration.RingBufferPersistStore {
r.totalMessagesIndex = r.getIndexValue()
}
// Fill the buffer when new data arrives into the system
go r.fillBuffer(ctx, ringBufferInCh)
// Start the process to permanently store done messages.
go r.startPermanentStore(ctx)
// Start the process that will handle messages present in the ringbuffer.
go r.processBufferMessages(ctx, ringBufferOutCh)
go func() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if r.configuration.RingBufferPersistStore {
r.dbUpdateMetrics(r.samValueBucket)
}
case <-ctx.Done():
return
}
}
}()
}
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
func (r *ringBuffer) fillBuffer(ctx context.Context, ringBufferInCh chan subjectAndMessage) {
// At startup get all the values that might be in the K/V store so we can
// put them into the buffer before we start to fill up with new incomming
// messages to the system.
// This is needed when the program have been restarted, and we need to check
// if there where previously unhandled messages that need to be handled first.
if r.configuration.RingBufferPersistStore {
func() {
s, err := r.dumpBucket(r.samValueBucket)
if err != nil {
er := fmt.Errorf("info: fillBuffer: retreival of values from k/v store failed, probaly empty database, and no previous entries in db to process: %v", err)
log.Printf("%v\n", er)
return
}
for _, v := range s {
r.bufData <- v
}
}()
}
// Check for incomming messages. These are typically comming from
// the go routine who reads the socket.
for {
select {
case sam := <-ringBufferInCh:
// Check if default message values for timers are set, and if
// not then set default message values.
if sam.Message.ACKTimeout < 1 {
sam.Subject.Event = EventNACK
}
if sam.Message.ACKTimeout >= 1 {
sam.Subject.Event = EventNACK
}
switch {
case sam.Message.Retries < 0:
sam.Message.Retries = r.configuration.DefaultMessageRetries
}
if sam.Message.MethodTimeout < 1 && sam.Message.MethodTimeout != -1 {
sam.Message.MethodTimeout = r.configuration.DefaultMethodTimeout
}
// --- Store the incomming message in the k/v store ---
// Get a unique number for the message to use when storing
// it in the databases, and also use when further processing.
r.mu.Lock()
dbID := r.totalMessagesIndex
r.mu.Unlock()
// Create a structure for JSON marshaling.
samV := samDBValue{
ID: dbID,
SAM: sam,
}
if r.configuration.RingBufferPersistStore {
js, err := json.Marshal(samV)
if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er, logError)
}
// Store the incomming message in key/value store
err = r.dbUpdate(r.db, r.samValueBucket, strconv.Itoa(dbID), js)
if err != nil {
er := fmt.Errorf("error: dbUpdate samValue failed: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er, logError)
}
}
// Put the message on the inmemory buffer.
r.bufData <- samV
// Increment index, and store the new value to the database.
r.mu.Lock()
r.totalMessagesIndex++
if r.configuration.RingBufferPersistStore {
r.dbUpdate(r.db, r.indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
}
r.mu.Unlock()
case <-ctx.Done():
// When done close the buffer channel
close(r.bufData)
return
}
}
}
// processBufferMessages will pick messages from the buffer, and process them
// one by one. The messages will be delivered on the outCh, and it will wait
// until a signal is received on the done channel before it continues with the
// next message.
func (r *ringBuffer) processBufferMessages(ctx context.Context, ringBufferOutCh chan samDBValueAndDelivered) {
// Range over the buffer of messages to pass on to processes.
for {
select {
case samDBv := <-r.bufData:
r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData)))
samDBv.SAM.ID = samDBv.ID
// // Create a done channel per message. A process started by the
// // spawnProcess function will handle incomming messages sequentaly.
// // So in the spawnProcess function we put a struct{} value when a
// // message is processed on the "done" channel and an ack is received
// // for a message, and we wait here for the "done" to be received.
// We start the actual processing of an individual message here within
// it's own go routine. Reason is that we don't want to block other
// messages to be processed while waiting for the done signal, or if an
// error with an individual message occurs.
go func(v samDBValue) {
// Create a copy of the message that we can use to write to the
// perm store without causing a race since the REQ handler for the
// message might not yet be done when message is written to the
// perm store.
// We also need a copy to be able to remove the data from the message
// when writing it to the store, so we don't mess up to actual data
// that might be in use in the handler.
msgForPermStore := Message{}
copier.Copy(&msgForPermStore, v.SAM.Message)
// Remove the content of the data field.
msgForPermStore.Data = nil
v.SAM.Message.done = make(chan struct{})
delivredCh := make(chan struct{})
// Prepare the structure with the data, and a function that can
// be called when the data is received for signaling back.
sd := samDBValueAndDelivered{
samDBValue: v,
delivered: func() {
delivredCh <- struct{}{}
},
}
// Create a ticker that will kick in when a message have been in the
// system for it's maximum time. This will allow us to continue, and
// remove the message if it takes longer than it should to get delivered.
fmt.Printf("DEBUG:::%v\n", v.SAM.ACKTimeout)
if v.SAM.ACKTimeout <= 0 {
v.SAM.ACKTimeout = 1
}
if v.SAM.Retries <= 0 {
v.SAM.Retries = 1
}
ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * time.Second)
defer ticker.Stop()
ringBufferOutCh <- sd
// Just to confirm here that the message was picked up, to know if the
// the read process have stalled or not.
// For now it will not do anything,
select {
case <-delivredCh:
// OK.
case <-time.After(time.Second * 5):
// TODO: Check if more logic should be made here if messages are stuck etc.
// Testing with a timeout here to figure out if messages are stuck
// waiting for done signal.
log.Printf("Error: ringBuffer: message %v seems to be stuck, did not receive delivered signal from reading process\n", v.ID)
r.metrics.promRingbufferStalledMessagesTotal.Inc()
}
// Listen on the done channel here , so a go routine handling the
// message will be able to signal back here that the message have
// been processed, and that we then can delete it out of the K/V Store.
// The publishAMessage method should send a done back here, but in some situations
// it seems that that do not happen. Implementing a ticker that is twice the total
// amount of time a message should be allowed to be using for getting published so
// we don't get stuck go routines here.
//
if r.configuration.RingBufferPersistStore {
select {
case <-v.SAM.done:
fmt.Printf("---\n DONE with\n---\n")
case <-ticker.C:
log.Printf("----------------------------------------------\n")
log.Printf("Error: ringBuffer message id: %v, subject: %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.ID, v.SAM.Subject)
log.Printf("----------------------------------------------\n")
}
}
// log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
r.metrics.promMessagesProcessedIDLast.Set(float64(v.ID))
// Since we are now done with the specific message we can delete
// it out of the K/V Store.
if r.configuration.RingBufferPersistStore {
r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID))
}
js, err := json.Marshal(msgForPermStore)
if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er, logError)
}
r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
}(samDBv)
case <-ctx.Done():
return
}
}
}
// dumpBucket will dump out all they keys and values in the
// specified bucket, and return a sorted []samDBValue
func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
samDBValues := []samDBValue{}
err := r.db.View(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket))
if bu == nil {
return fmt.Errorf("error: ringBuffer.dumpBucket: tx.bucket returned nil")
}
type kv struct {
key []byte
value []byte
}
dbkvs := []kv{}
// Get all the items from the db.
err := bu.ForEach(func(k, v []byte) error {
va := kv{
key: k,
value: v,
}
dbkvs = append(dbkvs, va)
return nil
})
if err != nil {
// Todo: what to return here ?
log.Fatalf("error: ringBuffer: ranging db failed: %v", err)
}
// Range all the values we got from the db, unmarshal each value.
// If the unmarshaling is ok we put it on the samDBValues slice,
// if it fails the value is not of correct format so we we delete
// it from the db, and loop to work on the next value.
for _, dbkv := range dbkvs {
var sdbv samDBValue
err := json.Unmarshal(dbkv.value, &sdbv)
if err != nil {
// If we're unable to unmarshal the value it value of wrong format,
// so we log it, and delete the value from the bucker.
log.Printf("error: ringBuffer.dumpBucket json.Umarshal failed: %v\n", err)
r.deleteKeyFromBucket(r.samValueBucket, string(dbkv.key))
continue
}
samDBValues = append(samDBValues, sdbv)
}
// TODO:
// BoltDB do not automatically shrink in filesize. We should delete the db, and create a new one to shrink the size.
// Sort the order of the slice items based on ID, since they where retreived from a map.
sort.SliceStable(samDBValues, func(i, j int) bool {
return samDBValues[i].ID > samDBValues[j].ID
})
for _, samDBv := range samDBValues {
log.Printf("info: ringBuffer.dumpBucket: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", samDBv.ID, samDBv.SAM.ID, samDBv.SAM.Subject, len(samDBv.SAM.Data))
}
return nil
})
if err != nil {
return nil, err
}
return samDBValues, err
}
// // printBucketContent will print out all they keys and values in the
// // specified bucket.
// func (r *ringBuffer) printBucketContent(bucket string) error {
// err := r.db.View(func(tx *bolt.Tx) error {
// bu := tx.Bucket([]byte(bucket))
//
// bu.ForEach(func(k, v []byte) error {
// var vv samDBValue
// err := json.Unmarshal(v, &vv)
// if err != nil {
// log.Printf("error: printBucketContent json.Umarshal failed: %v\n", err)
// }
// log.Printf("k: %s, v: %v\n", k, vv)
// return nil
// })
//
// return nil
// })
//
// return err
// }
// deleteKeyFromBucket will delete the specified key from the specified
// bucket if it exists.
func (r *ringBuffer) deleteKeyFromBucket(bucket string, key string) error {
err := r.db.Update(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket))
err := bu.Delete([]byte(key))
if err != nil {
log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err)
}
return nil
})
return err
}
// db update metrics.
func (r *ringBuffer) dbUpdateMetrics(bucket string) error {
err := r.db.Update(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket))
r.metrics.promDBMessagesCurrent.Set(float64(bu.Stats().KeyN))
return nil
})
return err
}
// getIndexValue will get the last index value stored in DB.
func (r *ringBuffer) getIndexValue() int {
const indexKey string = "index"
indexB, err := r.dbView(r.db, r.indexValueBucket, indexKey)
if err != nil {
log.Printf("error: getIndexValue: dbView: %v\n", err)
}
index, err := strconv.Atoi(string(indexB))
if err != nil && string(indexB) == "" {
log.Printf("info: getIndexValue: no index value found, probaly empty database, and no previous entries in db to process : %v\n", err)
}
return index
}
// dbView will look up and return a specific value if it exists for a key in a bucket in a DB.
func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) {
var value []byte
// View is a help function to get values out of the database.
err := db.View(func(tx *bolt.Tx) error {
//Open a bucket to get key's and values from.
bu := tx.Bucket([]byte(bucket))
if bu == nil {
log.Printf("info: no db bucket exist: %v\n", bucket)
return nil
}
v := bu.Get([]byte(key))
if len(v) == 0 {
log.Printf("info: view: key not found\n")
return nil
}
value = v
return nil
})
return value, err
}
// dbUpdate will update the specified bucket with a key and value.
func (r *ringBuffer) dbUpdate(db *bolt.DB, bucket string, key string, value []byte) error {
err := db.Update(func(tx *bolt.Tx) error {
//Create a bucket
bu, err := tx.CreateBucketIfNotExists([]byte(bucket))
if err != nil {
return fmt.Errorf("error: CreateBuckerIfNotExists failed: %v", err)
}
//Put a value into the bucket.
if err := bu.Put([]byte(key), []byte(value)); err != nil {
return err
}
//If all was ok, we should return a nil for a commit to happen. Any error
// returned will do a rollback.
return nil
})
return err
}
// startPermStore will start the process that will handle writing of
// handled message to a permanent file.
// To store a message in the store, send what to store on the
// ringbuffer.permStore channel.
func (r *ringBuffer) startPermanentStore(ctx context.Context) {
storeFile := filepath.Join(r.configuration.DatabaseFolder, "store.log")
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0660)
if err != nil {
log.Printf("error: startPermanentStore: failed to open file: %v\n", err)
}
defer f.Close()
for {
select {
case d := <-r.permStore:
_, err := f.WriteString(d)
if err != nil {
log.Printf("error:failed to write entry: %v\n", err)
}
case <-ctx.Done():
return
}
}
}