diff --git a/incommmingBuffer.db b/incommmingBuffer.db index 2a70a16..0c1eac2 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/publisher.go b/publisher.go index 28465de..7c451a1 100644 --- a/publisher.go +++ b/publisher.go @@ -141,7 +141,7 @@ func (s *server) printProcessesMap() { // given to the system, and route them to the correct subject queue. func (s *server) handleMessagesInRingbuffer() { // Prepare and start a new ring buffer - const bufferSize int = 100 + const bufferSize int = 1000 rb := newringBuffer(bufferSize) inCh := make(chan subjectAndMessage) outCh := make(chan samDBValue) diff --git a/ringbuffer.go b/ringbuffer.go index b6acc3b..ac5a430 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -14,6 +14,7 @@ import ( "fmt" "log" "os" + "sort" "strconv" "sync" "time" @@ -79,6 +80,22 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) { // fillBuffer will fill the buffer in the ringbuffer reading from the inchannel. // It will also store the messages in a K/V DB while being processed. func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string) { + // At startup get all the values that might be in the K/V store so we can + // put them into the buffer before we start to fill up with new incomming + // messages to the system. + // This is needed when the program have been restarted, and we need to check + // if there where previously unhandled messages that need to be handled first. + func() { + s, err := r.dumpBucket(samValueBucket) + if err != nil { + log.Printf("error: retreival of values from k/v store failed: %v\n", err) + } + + for _, v := range s { + r.bufData <- v + } + }() + // Check for incomming messages. These are typically comming from // the go routine who reads inmsg.txt. for v := range inCh { @@ -108,7 +125,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri log.Printf("error: dbUpdate samValue failed: %v\n", err) } - // Send the message to some process to consume it. + // Put the message on the inmemory buffer. r.bufData <- samV // Increment index, and store the new value to the database. @@ -158,13 +175,13 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID)) fmt.Printf("******* DELETED KEY %v FROM BUCKET*******\n", v.ID) - r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now(), v) + r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now().UTC(), v) // TODO: Write the Key/Value we just deleted to a file acting // as the transaction log for the system. // REMOVE: Dump the whole KV store - err := r.dumpBucket(samValueBucket) + err := r.printBucketContent(samValueBucket) if err != nil { fmt.Printf("* Error: dump of db failed: %v\n", err) } @@ -175,9 +192,44 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam close(outCh) } -// dumpBucket will print out all they keys and values in the -// specified bucker. -func (r *ringBuffer) dumpBucket(bucket string) error { +// dumpBucket will dump out all they keys and values in the +// specified bucket, and return a sorted []samDBValue +func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) { + samDBValues := []samDBValue{} + + err := r.db.View(func(tx *bolt.Tx) error { + bu := tx.Bucket([]byte(bucket)) + + // For each element found in the DB, unmarshal, and put on slice. + bu.ForEach(func(k, v []byte) error { + var vv samDBValue + err := json.Unmarshal(v, &vv) + if err != nil { + log.Printf("error: dumpBucket json.Umarshal failed: %v\n", err) + } + samDBValues = append(samDBValues, vv) + return nil + }) + + // Sort the order of the slice items based on ID, since they where retreived from a map. + sort.SliceStable(samDBValues, func(i, j int) bool { + return samDBValues[i].ID > samDBValues[j].ID + }) + + fmt.Println("--------------------------K/V DUMP TO VARIABLE SORTED---------------------------") + for _, v := range samDBValues { + fmt.Printf("%#v\n", v) + } + fmt.Println("----------------------------------------------------------------------------------") + return nil + }) + + return samDBValues, err +} + +// printBuckerContent will print out all they keys and values in the +// specified bucket. +func (r *ringBuffer) printBucketContent(bucket string) error { err := r.db.View(func(tx *bolt.Tx) error { bu := tx.Bucket([]byte(bucket)) diff --git a/textlogging.log b/textlogging.log index 6365e5e..38dfdd3 100644 --- a/textlogging.log +++ b/textlogging.log @@ -19,3 +19,4 @@ some message sent from a ship for testing some message sent from a ship for testing some message sent from a ship for testing some message sent from a ship for testing +some message sent from a ship for testing