mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-05 23:06:47 +00:00
kv store for index and messages pu in
This commit is contained in:
parent
993f89e2c6
commit
3cb0c9f5c9
6 changed files with 58 additions and 64 deletions
|
@ -6,8 +6,6 @@
|
||||||
|
|
||||||
package steward
|
package steward
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
|
|
||||||
// CommandOrEvent describes on the message level if this is
|
// CommandOrEvent describes on the message level if this is
|
||||||
// an event or command kind of message in the Subject name.
|
// an event or command kind of message in the Subject name.
|
||||||
// This field is mainly used to be able to spawn up different
|
// This field is mainly used to be able to spawn up different
|
||||||
|
@ -54,10 +52,10 @@ type CommandOrEventAvailable struct {
|
||||||
func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent) bool {
|
func (co CommandOrEventAvailable) CheckIfExists(c CommandOrEvent) bool {
|
||||||
_, ok := co.topics[c]
|
_, ok := co.topics[c]
|
||||||
if ok {
|
if ok {
|
||||||
fmt.Printf("******THE TOPIC EXISTS: %v******\n", c)
|
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", c)
|
||||||
return true
|
return true
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", c)
|
// fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", c)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,7 +136,7 @@ func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) {
|
||||||
select {
|
select {
|
||||||
case event := <-watcher.Events:
|
case event := <-watcher.Events:
|
||||||
if event.Op&fsnotify.Write == fsnotify.Write {
|
if event.Op&fsnotify.Write == fsnotify.Write {
|
||||||
log.Println("info: inmsg.txt file updated, processing input: ", event.Name)
|
// log.Println("info: inmsg.txt file updated, processing input: ", event.Name)
|
||||||
//testing with an update chan to get updates
|
//testing with an update chan to get updates
|
||||||
fileUpdated <- true
|
fileUpdated <- true
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
|
@ -46,10 +46,10 @@ type MethodsAvailable struct {
|
||||||
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
|
func (ma MethodsAvailable) CheckIfExists(m Method) (methodHandler, bool) {
|
||||||
mFunc, ok := ma.topics[m]
|
mFunc, ok := ma.topics[m]
|
||||||
if ok {
|
if ok {
|
||||||
fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
|
// fmt.Printf("******THE TOPIC EXISTS: %v******\n", m)
|
||||||
return mFunc, true
|
return mFunc, true
|
||||||
} else {
|
} else {
|
||||||
fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m)
|
// fmt.Printf("******THE TOPIC DO NOT EXIST: %v******\n", m)
|
||||||
return nil, false
|
return nil, false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -147,11 +147,8 @@ func (s *server) handleMessagesToPublish() {
|
||||||
// pipe requested by operator, and fill them into the buffer.
|
// pipe requested by operator, and fill them into the buffer.
|
||||||
go func() {
|
go func() {
|
||||||
for samSlice := range s.inputFromFileCh {
|
for samSlice := range s.inputFromFileCh {
|
||||||
fmt.Println("***** DEBUG ranging samSlice")
|
|
||||||
for _, sam := range samSlice {
|
for _, sam := range samSlice {
|
||||||
fmt.Println("***** DEBUG putting on channel")
|
|
||||||
inCh <- sam
|
inCh <- sam
|
||||||
fmt.Println("***** DEBUG done putting on channel")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(inCh)
|
close(inCh)
|
||||||
|
|
103
ringbuffer.go
103
ringbuffer.go
|
@ -10,8 +10,7 @@
|
||||||
package steward
|
package steward
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"encoding/json"
|
||||||
"encoding/gob"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -31,13 +30,13 @@ type samDBValue struct {
|
||||||
type ringBuffer struct {
|
type ringBuffer struct {
|
||||||
bufData chan subjectAndMessage
|
bufData chan subjectAndMessage
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
|
totalMessagesIndex int
|
||||||
}
|
}
|
||||||
|
|
||||||
// newringBuffer is a push/pop storage for values.
|
// newringBuffer is a push/pop storage for values.
|
||||||
func newringBuffer(size int) *ringBuffer {
|
func newringBuffer(size int) *ringBuffer {
|
||||||
db, err := bolt.Open("./incommmingBuffer.db", 0600, nil)
|
db, err := bolt.Open("./incommmingBuffer.db", 0600, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: error handling
|
|
||||||
log.Printf("error: failed to open db: %v\n", err)
|
log.Printf("error: failed to open db: %v\n", err)
|
||||||
}
|
}
|
||||||
return &ringBuffer{
|
return &ringBuffer{
|
||||||
|
@ -53,49 +52,44 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMes
|
||||||
// Starting both writing and reading in separate go routines so we
|
// Starting both writing and reading in separate go routines so we
|
||||||
// can write and read concurrently.
|
// can write and read concurrently.
|
||||||
|
|
||||||
const samValueBucket string = "samValues"
|
const samValueBucket string = "samValueBucket"
|
||||||
|
const indexValueBucket string = "indexValueBucket"
|
||||||
|
|
||||||
i := 0
|
r.totalMessagesIndex = r.getIndexValue(indexValueBucket)
|
||||||
|
|
||||||
// Fill the buffer when new data arrives
|
// Fill the buffer when new data arrives
|
||||||
go func() {
|
go func() {
|
||||||
|
// Check for incomming messages. These are typically comming from
|
||||||
|
// the go routine who reads inmsg.txt.
|
||||||
for v := range inCh {
|
for v := range inCh {
|
||||||
r.bufData <- v
|
// --- Store the incomming message in the k/v store ---
|
||||||
fmt.Printf("**BUFFER** DEBUG PUSHED ON BUFFER: value = %v\n\n", v)
|
|
||||||
|
|
||||||
iv := strconv.Itoa(i)
|
// Create a structure for JSON marshaling.
|
||||||
samV := samDBValue{
|
samV := samDBValue{
|
||||||
ID: i,
|
ID: r.totalMessagesIndex,
|
||||||
Data: v,
|
Data: v,
|
||||||
}
|
}
|
||||||
|
|
||||||
svGob, err := gobEncodeSamValue(samV)
|
js, err := json.Marshal(samV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error: gob encoding samValue: %v\n", err)
|
log.Printf("error: gob encoding samValue: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also store the incomming message in key/value store
|
// Store the incomming message in key/value store
|
||||||
err = r.dbUpdate(r.db, samValueBucket, iv, svGob)
|
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(r.totalMessagesIndex), js)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: Handle error
|
// TODO: Handle error
|
||||||
log.Printf("error: dbUpdate samValue failed: %v\n", err)
|
log.Printf("error: dbUpdate samValue failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
retreivedGob, err := r.dbView(r.db, samValueBucket, iv)
|
// Send the message to some process to consume it.
|
||||||
if err != nil {
|
r.bufData <- v
|
||||||
// TODO: Handle error
|
|
||||||
log.Printf("error: dbView retreival samValue failed: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
retreived, err := gobDecodeSamValue(retreivedGob)
|
// Increment index, and store the new value to the database.
|
||||||
if err != nil {
|
r.totalMessagesIndex++
|
||||||
// TODO: Handle error
|
fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex)
|
||||||
log.Printf("error: dbView gobDecode retreival samValue failed: %v\n", err)
|
fmt.Println("---------------------------------------------------------")
|
||||||
}
|
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
|
||||||
|
|
||||||
fmt.Printf("*************** INFO: dbView, key: %v, got value: %v\n ", iv, retreived)
|
|
||||||
|
|
||||||
i++
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// When done close the buffer channel
|
// When done close the buffer channel
|
||||||
|
@ -106,22 +100,52 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMes
|
||||||
go func() {
|
go func() {
|
||||||
for v := range r.bufData {
|
for v := range r.bufData {
|
||||||
outCh <- v
|
outCh <- v
|
||||||
|
|
||||||
|
// TODO: Delete the messages here. The SAM handled here, do
|
||||||
|
// not contain the totalMessageID, so we might need to change
|
||||||
|
// the struct we pass around.
|
||||||
|
// IDEA: Add a go routine for each message handled here, and include
|
||||||
|
// a done channel in the structure, so a go routine handling the
|
||||||
|
// message will be able to signal back here that the message have
|
||||||
|
// been processed, and that we then can delete it out of the K/V Store.
|
||||||
}
|
}
|
||||||
|
|
||||||
close(outCh)
|
close(outCh)
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ringBuffer) getIndexValue(indexBucket string) int {
|
||||||
|
const indexKey string = "index"
|
||||||
|
indexB, err := r.dbView(r.db, indexBucket, indexKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: getIndexValue: dbView: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
index, err := strconv.Atoi(string(indexB))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: getIndexValue: strconv.Atoi : %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("**** RETURNING INDEX, WITH VALUE = %v\n", index)
|
||||||
|
|
||||||
|
return index
|
||||||
|
}
|
||||||
|
|
||||||
func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) {
|
func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) {
|
||||||
var value []byte
|
var value []byte
|
||||||
//View is a help function to get values out of the database.
|
//View is a help function to get values out of the database.
|
||||||
err := db.View(func(tx *bolt.Tx) error {
|
err := db.View(func(tx *bolt.Tx) error {
|
||||||
//Open a bucket to get key's and values from.
|
//Open a bucket to get key's and values from.
|
||||||
bu := tx.Bucket([]byte(bucket))
|
bu := tx.Bucket([]byte(bucket))
|
||||||
|
if bu == nil {
|
||||||
|
log.Printf("info: no such bucket exist: %v\n", bucket)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
v := bu.Get([]byte(key))
|
v := bu.Get([]byte(key))
|
||||||
if len(v) == 0 {
|
if len(v) == 0 {
|
||||||
return fmt.Errorf("info: view: key not found")
|
log.Printf("info: view: key not found\n")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
value = v
|
value = v
|
||||||
|
@ -153,28 +177,3 @@ func (r *ringBuffer) dbUpdate(db *bolt.DB, bucket string, key string, value []by
|
||||||
})
|
})
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func gobEncodeSamValue(samValue samDBValue) ([]byte, error) {
|
|
||||||
var buf bytes.Buffer
|
|
||||||
gobEnc := gob.NewEncoder(&buf)
|
|
||||||
err := gobEnc.Encode(samValue)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error: gob.Encode failed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return buf.Bytes(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func gobDecodeSamValue(b []byte) (samDBValue, error) {
|
|
||||||
sv := samDBValue{}
|
|
||||||
|
|
||||||
buf := bytes.NewBuffer(b)
|
|
||||||
gobDec := gob.NewDecoder(buf)
|
|
||||||
err := gobDec.Decode(&sv)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error: gob decoding failed: %v\n", err)
|
|
||||||
return samDBValue{}, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return sv, nil
|
|
||||||
}
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue