mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
retreival and handling of previosly unhandled messages on program restart implemented
This commit is contained in:
parent
c60699bce5
commit
6a81c0e9db
4 changed files with 60 additions and 7 deletions
Binary file not shown.
|
@ -141,7 +141,7 @@ func (s *server) printProcessesMap() {
|
||||||
// given to the system, and route them to the correct subject queue.
|
// given to the system, and route them to the correct subject queue.
|
||||||
func (s *server) handleMessagesInRingbuffer() {
|
func (s *server) handleMessagesInRingbuffer() {
|
||||||
// Prepare and start a new ring buffer
|
// Prepare and start a new ring buffer
|
||||||
const bufferSize int = 100
|
const bufferSize int = 1000
|
||||||
rb := newringBuffer(bufferSize)
|
rb := newringBuffer(bufferSize)
|
||||||
inCh := make(chan subjectAndMessage)
|
inCh := make(chan subjectAndMessage)
|
||||||
outCh := make(chan samDBValue)
|
outCh := make(chan samDBValue)
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
@ -79,6 +80,22 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
|
||||||
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
|
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
|
||||||
// It will also store the messages in a K/V DB while being processed.
|
// It will also store the messages in a K/V DB while being processed.
|
||||||
func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string) {
|
func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string) {
|
||||||
|
// At startup get all the values that might be in the K/V store so we can
|
||||||
|
// put them into the buffer before we start to fill up with new incomming
|
||||||
|
// messages to the system.
|
||||||
|
// This is needed when the program have been restarted, and we need to check
|
||||||
|
// if there where previously unhandled messages that need to be handled first.
|
||||||
|
func() {
|
||||||
|
s, err := r.dumpBucket(samValueBucket)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: retreival of values from k/v store failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range s {
|
||||||
|
r.bufData <- v
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Check for incomming messages. These are typically comming from
|
// Check for incomming messages. These are typically comming from
|
||||||
// the go routine who reads inmsg.txt.
|
// the go routine who reads inmsg.txt.
|
||||||
for v := range inCh {
|
for v := range inCh {
|
||||||
|
@ -108,7 +125,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
log.Printf("error: dbUpdate samValue failed: %v\n", err)
|
log.Printf("error: dbUpdate samValue failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the message to some process to consume it.
|
// Put the message on the inmemory buffer.
|
||||||
r.bufData <- samV
|
r.bufData <- samV
|
||||||
|
|
||||||
// Increment index, and store the new value to the database.
|
// Increment index, and store the new value to the database.
|
||||||
|
@ -158,13 +175,13 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
|
||||||
r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID))
|
r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID))
|
||||||
fmt.Printf("******* DELETED KEY %v FROM BUCKET*******\n", v.ID)
|
fmt.Printf("******* DELETED KEY %v FROM BUCKET*******\n", v.ID)
|
||||||
|
|
||||||
r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now(), v)
|
r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now().UTC(), v)
|
||||||
|
|
||||||
// TODO: Write the Key/Value we just deleted to a file acting
|
// TODO: Write the Key/Value we just deleted to a file acting
|
||||||
// as the transaction log for the system.
|
// as the transaction log for the system.
|
||||||
|
|
||||||
// REMOVE: Dump the whole KV store
|
// REMOVE: Dump the whole KV store
|
||||||
err := r.dumpBucket(samValueBucket)
|
err := r.printBucketContent(samValueBucket)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("* Error: dump of db failed: %v\n", err)
|
fmt.Printf("* Error: dump of db failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
@ -175,9 +192,44 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
|
||||||
close(outCh)
|
close(outCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
// dumpBucket will print out all they keys and values in the
|
// dumpBucket will dump out all they keys and values in the
|
||||||
// specified bucker.
|
// specified bucket, and return a sorted []samDBValue
|
||||||
func (r *ringBuffer) dumpBucket(bucket string) error {
|
func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) {
|
||||||
|
samDBValues := []samDBValue{}
|
||||||
|
|
||||||
|
err := r.db.View(func(tx *bolt.Tx) error {
|
||||||
|
bu := tx.Bucket([]byte(bucket))
|
||||||
|
|
||||||
|
// For each element found in the DB, unmarshal, and put on slice.
|
||||||
|
bu.ForEach(func(k, v []byte) error {
|
||||||
|
var vv samDBValue
|
||||||
|
err := json.Unmarshal(v, &vv)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: dumpBucket json.Umarshal failed: %v\n", err)
|
||||||
|
}
|
||||||
|
samDBValues = append(samDBValues, vv)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
// Sort the order of the slice items based on ID, since they where retreived from a map.
|
||||||
|
sort.SliceStable(samDBValues, func(i, j int) bool {
|
||||||
|
return samDBValues[i].ID > samDBValues[j].ID
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("--------------------------K/V DUMP TO VARIABLE SORTED---------------------------")
|
||||||
|
for _, v := range samDBValues {
|
||||||
|
fmt.Printf("%#v\n", v)
|
||||||
|
}
|
||||||
|
fmt.Println("----------------------------------------------------------------------------------")
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return samDBValues, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// printBuckerContent will print out all they keys and values in the
|
||||||
|
// specified bucket.
|
||||||
|
func (r *ringBuffer) printBucketContent(bucket string) error {
|
||||||
err := r.db.View(func(tx *bolt.Tx) error {
|
err := r.db.View(func(tx *bolt.Tx) error {
|
||||||
bu := tx.Bucket([]byte(bucket))
|
bu := tx.Bucket([]byte(bucket))
|
||||||
|
|
||||||
|
|
|
@ -19,3 +19,4 @@ some message sent from a ship for testing
|
||||||
some message sent from a ship for testing
|
some message sent from a ship for testing
|
||||||
some message sent from a ship for testing
|
some message sent from a ship for testing
|
||||||
some message sent from a ship for testing
|
some message sent from a ship for testing
|
||||||
|
some message sent from a ship for testing
|
||||||
|
|
Loading…
Add table
Reference in a new issue