mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
Updated comments
This commit is contained in:
parent
1f998b3b6a
commit
08bb43a620
1 changed files with 14 additions and 0 deletions
|
@ -54,6 +54,9 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
|
|||
// Starting both writing and reading in separate go routines so we
|
||||
// can write and read concurrently.
|
||||
|
||||
// TODO: At startup, check if there are unprocessed messages in
|
||||
// the K/V store, and process them.
|
||||
|
||||
const samValueBucket string = "samValueBucket"
|
||||
const indexValueBucket string = "indexValueBucket"
|
||||
|
||||
|
@ -141,9 +144,14 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
|
|||
fmt.Printf("### DONE WITH THE MESSAGE\n")
|
||||
fmt.Println("-----------------------------------------------------------")
|
||||
|
||||
// Since we are now done with the specific message we can delete
|
||||
// it out of the database.
|
||||
r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID))
|
||||
fmt.Printf("******* DELETED KEY %v FROM BUCKET*******", v.ID)
|
||||
|
||||
// TODO: Write the Key/Value we just deleted to a file acting
|
||||
// as the transaction log for the system.
|
||||
|
||||
// TODO: Delete the messages here. The SAM handled here, do
|
||||
// not contain the totalMessageID, so we might need to change
|
||||
// the struct we pass around.
|
||||
|
@ -164,6 +172,8 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
|
|||
close(outCh)
|
||||
}
|
||||
|
||||
// dumpBucket will print out all they keys and values in the
|
||||
// specified bucker.
|
||||
func (r *ringBuffer) dumpBucket(bucket string) error {
|
||||
err := r.db.View(func(tx *bolt.Tx) error {
|
||||
bu := tx.Bucket([]byte(bucket))
|
||||
|
@ -185,6 +195,8 @@ func (r *ringBuffer) dumpBucket(bucket string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// deleteKeyFromBucket will delete the specified key from the specified
|
||||
// bucket if it exists.
|
||||
func (r *ringBuffer) deleteKeyFromBucket(bucket string, key string) error {
|
||||
err := r.db.Update(func(tx *bolt.Tx) error {
|
||||
bu := tx.Bucket([]byte(bucket))
|
||||
|
@ -200,6 +212,7 @@ func (r *ringBuffer) deleteKeyFromBucket(bucket string, key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// getIndexValue will get the last index value stored in DB.
|
||||
func (r *ringBuffer) getIndexValue(indexBucket string) int {
|
||||
const indexKey string = "index"
|
||||
indexB, err := r.dbView(r.db, indexBucket, indexKey)
|
||||
|
@ -217,6 +230,7 @@ func (r *ringBuffer) getIndexValue(indexBucket string) int {
|
|||
return index
|
||||
}
|
||||
|
||||
// dbView will look up a specific value for a key in a bucket in a DB.
|
||||
func (r *ringBuffer) dbView(db *bolt.DB, bucket string, key string) ([]byte, error) {
|
||||
var value []byte
|
||||
//View is a help function to get values out of the database.
|
||||
|
|
Loading…
Reference in a new issue