diff --git a/incommmingBuffer.db b/incommmingBuffer.db index e24a958..d62e593 100644 Binary files a/incommmingBuffer.db and b/incommmingBuffer.db differ diff --git a/publisher.go b/publisher.go index 3e454e5..c11869c 100644 --- a/publisher.go +++ b/publisher.go @@ -144,7 +144,7 @@ func (s *server) handleMessagesInRingbuffer() { const bufferSize int = 100 rb := newringBuffer(bufferSize) inCh := make(chan subjectAndMessage) - outCh := make(chan subjectAndMessage) + outCh := make(chan samDBValue) rb.start(inCh, outCh) // Start reading new messages received on the incomming message @@ -165,7 +165,8 @@ func (s *server) handleMessagesInRingbuffer() { // send if there are a specific subject for it, and no subject // exist throw an error. go func() { - for sam := range outCh { + for samTmp := range outCh { + sam := samTmp.Data // Check if the format of the message is correct. // TODO: Send a message to the error kernel here that // it was unable to process the message with the reason diff --git a/ringbuffer.go b/ringbuffer.go index 8dff90a..bcffb13 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -14,6 +14,7 @@ import ( "fmt" "log" "strconv" + "sync" bolt "go.etcd.io/bbolt" ) @@ -28,9 +29,10 @@ type samDBValue struct { // ringBuffer holds the data of the buffer, type ringBuffer struct { - bufData chan subjectAndMessage + bufData chan samDBValue db *bolt.DB totalMessagesIndex int + mu sync.Mutex } // newringBuffer is a push/pop storage for values. @@ -40,7 +42,7 @@ func newringBuffer(size int) *ringBuffer { log.Printf("error: failed to open db: %v\n", err) } return &ringBuffer{ - bufData: make(chan subjectAndMessage, size), + bufData: make(chan samDBValue, size), db: db, } } @@ -48,7 +50,7 @@ func newringBuffer(size int) *ringBuffer { // start will process incomming messages through the inCh, // put the messages on a buffered channel // and deliver messages out when requested on the outCh. -func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMessage) { +func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) { // Starting both writing and reading in separate go routines so we // can write and read concurrently. @@ -71,9 +73,15 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri for v := range inCh { // --- Store the incomming message in the k/v store --- + // Get a unique number for the message to use when storing + // it in the databases, and also use when further processing. + r.mu.Lock() + dbID := r.totalMessagesIndex + r.mu.Unlock() + // Create a structure for JSON marshaling. samV := samDBValue{ - ID: r.totalMessagesIndex, + ID: dbID, Data: v, } @@ -83,20 +91,23 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri } // Store the incomming message in key/value store - err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(r.totalMessagesIndex), js) + err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js) if err != nil { // TODO: Handle error log.Printf("error: dbUpdate samValue failed: %v\n", err) } // Send the message to some process to consume it. - r.bufData <- v + r.bufData <- samV // Increment index, and store the new value to the database. + r.mu.Lock() r.totalMessagesIndex++ fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex) + r.mu.Unlock() + fmt.Println("---------------------------------------------------------") - r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex))) + r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(dbID))) } // When done close the buffer channel @@ -107,7 +118,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri // one by one. The messages will be delivered on the outCh, and it will wait // until a signal is received on the done channel before it continues with the // next message. -func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan subjectAndMessage) { +func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan samDBValue) { // Range over the buffer of messages to pass on to processes. for v := range r.bufData { // Create a done channel per message. A process started by the @@ -120,16 +131,19 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sub // it's own go routine. Reason is that we don't want to block other // messages being blocked while waiting for the done signal, or if an // error with an individual message occurs. - go func(v subjectAndMessage) { - v.Message.done = make(chan struct{}) + go func(v samDBValue) { + v.Data.Message.done = make(chan struct{}) outCh <- v // ----------TESTING - <-v.done + <-v.Data.done fmt.Println("-----------------------------------------------------------") fmt.Printf("### DONE WITH THE MESSAGE\n") fmt.Println("-----------------------------------------------------------") + r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID)) + fmt.Printf("******* DELETED KEY %v FROM BUCKET*******", v.ID) + // TODO: Delete the messages here. The SAM handled here, do // not contain the totalMessageID, so we might need to change // the struct we pass around. @@ -165,12 +179,20 @@ func (r *ringBuffer) dumpBucket(bucket string) error { return nil }) - // c := bu.Cursor() - // - // fmt.Println("--------------------------DUMP---------------------------") - // for k, v := c.First(); k != nil; c.Next() { - // fmt.Printf("k: %s, v: %v\n", k, v) - // } + return nil + }) + + return err +} + +func (r *ringBuffer) deleteKeyFromBucket(bucket string, key string) error { + err := r.db.Update(func(tx *bolt.Tx) error { + bu := tx.Bucket([]byte(bucket)) + + err := bu.Delete([]byte(key)) + if err != nil { + log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err) + } return nil })