1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

addeded permanent log storage

This commit is contained in:
postmannen 2021-02-17 10:27:39 +01:00
parent 08bb43a620
commit c60699bce5
5 changed files with 52 additions and 25 deletions

2
.gitignore vendored
View file

@ -1,2 +1,4 @@
ship1/
ship2/
incommmingBuffer.db
store.log

View file

@ -34,7 +34,7 @@ func (s *server) getMessagesFromFile(directoryToCheck string, fileName string, i
// unmarshal the JSON into a struct
js, err := jsonFromFileData(b)
fmt.Printf("*** OUTPUT AFTER UNMARSHALING JSON: %#v\n", js)
// fmt.Printf("*** OUTPUT AFTER UNMARSHALING JSON: %#v\n", js)
if err != nil {
log.Printf("%v\n", err)
}
@ -60,7 +60,7 @@ func jsonFromFileData(b []byte) ([]subjectAndMessage, error) {
MsgSlice := []Message{}
err := json.Unmarshal(b, &MsgSlice)
fmt.Printf("*** OUTPUT DIRECTLY AFTER UNMARSHALING JSON: %#v\n", MsgSlice)
//fmt.Printf("*** OUTPUT DIRECTLY AFTER UNMARSHALING JSON: %#v\n", MsgSlice)
// TODO: Look into also marshaling from yaml and toml later
if err != nil {
return nil, fmt.Errorf("error: unmarshal of file failed: %#v", err)

Binary file not shown.

View file

@ -151,9 +151,6 @@ func (s *server) handleMessagesInRingbuffer() {
// pipe requested by operator, and fill them into the buffer.
go func() {
for samSlice := range s.inputFromFileCh {
fmt.Println("----------------------DEBUG1--------------------------------")
fmt.Printf("DEBUG!!!!!!!!!!!!!!\n")
fmt.Println("----------------------DEBUG1END-----------------------------")
for _, sam := range samSlice {
inCh <- sam
}
@ -186,7 +183,7 @@ func (s *server) handleMessagesInRingbuffer() {
redo:
m := sam.Message
subjName := sam.Subject.name()
fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
// DEBUG: fmt.Printf("** handleNewOperatorMessages: message: %v, ** subject: %#v\n", m, sam.Subject)
_, ok := s.processes[subjName]
if ok {
log.Printf("info: found the specific subject: %v\n", subjName)

View file

@ -13,8 +13,10 @@ import (
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"sync"
"time"
bolt "go.etcd.io/bbolt"
)
@ -33,6 +35,7 @@ type ringBuffer struct {
db *bolt.DB
totalMessagesIndex int
mu sync.Mutex
permStore chan string
}
// newringBuffer is a push/pop storage for values.
@ -42,8 +45,9 @@ func newringBuffer(size int) *ringBuffer {
log.Printf("error: failed to open db: %v\n", err)
}
return &ringBuffer{
bufData: make(chan samDBValue, size),
db: db,
bufData: make(chan samDBValue, size),
db: db,
permStore: make(chan string),
}
}
@ -62,9 +66,13 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
r.totalMessagesIndex = r.getIndexValue(indexValueBucket)
// Fill the buffer when new data arrives
// Fill the buffer when new data arrives into the system
go r.fillBuffer(inCh, samValueBucket, indexValueBucket)
// Start the process to permanently store done messages.
go r.startPermanentStore()
// Start the process that will handle messages present in the ringbuffer.
go r.processBufferMessages(samValueBucket, outCh)
}
@ -107,10 +115,9 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
r.mu.Lock()
r.totalMessagesIndex++
fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex)
r.mu.Unlock()
fmt.Println("---------------------------------------------------------")
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(dbID)))
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
r.mu.Unlock()
}
// When done close the buffer channel
@ -138,29 +145,25 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
v.Data.Message.done = make(chan struct{})
outCh <- v
// ----------TESTING
// Listen on the done channel here , so a go routine handling the
// message will be able to signal back here that the message have
// been processed, and that we then can delete it out of the K/V Store.
<-v.Data.done
fmt.Println("-----------------------------------------------------------")
fmt.Printf("### DONE WITH THE MESSAGE\n")
fmt.Println("-----------------------------------------------------------")
// Since we are now done with the specific message we can delete
// it out of the database.
// it out of the K/V Store.
r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID))
fmt.Printf("******* DELETED KEY %v FROM BUCKET*******", v.ID)
fmt.Printf("******* DELETED KEY %v FROM BUCKET*******\n", v.ID)
r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now(), v)
// TODO: Write the Key/Value we just deleted to a file acting
// as the transaction log for the system.
// TODO: Delete the messages here. The SAM handled here, do
// not contain the totalMessageID, so we might need to change
// the struct we pass around.
// IDEA: Add a go routine for each message handled here, and include
// a done channel in the structure, so a go routine handling the
// message will be able to signal back here that the message have
// been processed, and that we then can delete it out of the K/V Store.
// Dump the whole KV store
// REMOVE: Dump the whole KV store
err := r.dumpBucket(samValueBucket)
if err != nil {
fmt.Printf("* Error: dump of db failed: %v\n", err)
@ -178,7 +181,7 @@ func (r *ringBuffer) dumpBucket(bucket string) error {
err := r.db.View(func(tx *bolt.Tx) error {
bu := tx.Bucket([]byte(bucket))
fmt.Println("--------------------------DUMP---------------------------")
fmt.Println("--------------------------K/V STORE DUMP---------------------------")
bu.ForEach(func(k, v []byte) error {
var vv samDBValue
err := json.Unmarshal(v, &vv)
@ -188,6 +191,7 @@ func (r *ringBuffer) dumpBucket(bucket string) error {
fmt.Printf("k: %s, v: %v\n", k, vv)
return nil
})
fmt.Println("-------------------------------------------------------------------")
return nil
})
@ -277,3 +281,27 @@ func (r *ringBuffer) dbUpdate(db *bolt.DB, bucket string, key string, value []by
})
return err
}
// startPermStore will start the process that will handle writing of
// handled message to a permanent file.
// To store a message in the store, send what to store on the
// ringbuffer.permStore channel.
func (r *ringBuffer) startPermanentStore() {
const storeFile string = "store.log"
f, err := os.OpenFile(storeFile, os.O_APPEND|os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
log.Printf("Failed to open file %v\n", err)
}
defer f.Close()
for {
d := <-r.permStore
_, err := f.WriteString(d)
if err != nil {
log.Printf("error:failed to write entry: %v\n", err)
}
time.Sleep(time.Second * 1)
}
}