mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
changed types used for ringbuffer, and fixed mutex on message index
This commit is contained in:
parent
a5b8df9ea6
commit
1f998b3b6a
3 changed files with 42 additions and 19 deletions
Binary file not shown.
|
@ -144,7 +144,7 @@ func (s *server) handleMessagesInRingbuffer() {
|
||||||
const bufferSize int = 100
|
const bufferSize int = 100
|
||||||
rb := newringBuffer(bufferSize)
|
rb := newringBuffer(bufferSize)
|
||||||
inCh := make(chan subjectAndMessage)
|
inCh := make(chan subjectAndMessage)
|
||||||
outCh := make(chan subjectAndMessage)
|
outCh := make(chan samDBValue)
|
||||||
rb.start(inCh, outCh)
|
rb.start(inCh, outCh)
|
||||||
|
|
||||||
// Start reading new messages received on the incomming message
|
// Start reading new messages received on the incomming message
|
||||||
|
@ -165,7 +165,8 @@ func (s *server) handleMessagesInRingbuffer() {
|
||||||
// send if there are a specific subject for it, and no subject
|
// send if there are a specific subject for it, and no subject
|
||||||
// exist throw an error.
|
// exist throw an error.
|
||||||
go func() {
|
go func() {
|
||||||
for sam := range outCh {
|
for samTmp := range outCh {
|
||||||
|
sam := samTmp.Data
|
||||||
// Check if the format of the message is correct.
|
// Check if the format of the message is correct.
|
||||||
// TODO: Send a message to the error kernel here that
|
// TODO: Send a message to the error kernel here that
|
||||||
// it was unable to process the message with the reason
|
// it was unable to process the message with the reason
|
||||||
|
|
|
@ -14,6 +14,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
bolt "go.etcd.io/bbolt"
|
bolt "go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -28,9 +29,10 @@ type samDBValue struct {
|
||||||
|
|
||||||
// ringBuffer holds the data of the buffer,
|
// ringBuffer holds the data of the buffer,
|
||||||
type ringBuffer struct {
|
type ringBuffer struct {
|
||||||
bufData chan subjectAndMessage
|
bufData chan samDBValue
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
totalMessagesIndex int
|
totalMessagesIndex int
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// newringBuffer is a push/pop storage for values.
|
// newringBuffer is a push/pop storage for values.
|
||||||
|
@ -40,7 +42,7 @@ func newringBuffer(size int) *ringBuffer {
|
||||||
log.Printf("error: failed to open db: %v\n", err)
|
log.Printf("error: failed to open db: %v\n", err)
|
||||||
}
|
}
|
||||||
return &ringBuffer{
|
return &ringBuffer{
|
||||||
bufData: make(chan subjectAndMessage, size),
|
bufData: make(chan samDBValue, size),
|
||||||
db: db,
|
db: db,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,7 +50,7 @@ func newringBuffer(size int) *ringBuffer {
|
||||||
// start will process incomming messages through the inCh,
|
// start will process incomming messages through the inCh,
|
||||||
// put the messages on a buffered channel
|
// put the messages on a buffered channel
|
||||||
// and deliver messages out when requested on the outCh.
|
// and deliver messages out when requested on the outCh.
|
||||||
func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMessage) {
|
func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan samDBValue) {
|
||||||
// Starting both writing and reading in separate go routines so we
|
// Starting both writing and reading in separate go routines so we
|
||||||
// can write and read concurrently.
|
// can write and read concurrently.
|
||||||
|
|
||||||
|
@ -71,9 +73,15 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
for v := range inCh {
|
for v := range inCh {
|
||||||
// --- Store the incomming message in the k/v store ---
|
// --- Store the incomming message in the k/v store ---
|
||||||
|
|
||||||
|
// Get a unique number for the message to use when storing
|
||||||
|
// it in the databases, and also use when further processing.
|
||||||
|
r.mu.Lock()
|
||||||
|
dbID := r.totalMessagesIndex
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
// Create a structure for JSON marshaling.
|
// Create a structure for JSON marshaling.
|
||||||
samV := samDBValue{
|
samV := samDBValue{
|
||||||
ID: r.totalMessagesIndex,
|
ID: dbID,
|
||||||
Data: v,
|
Data: v,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,20 +91,23 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the incomming message in key/value store
|
// Store the incomming message in key/value store
|
||||||
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(r.totalMessagesIndex), js)
|
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(dbID), js)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: Handle error
|
// TODO: Handle error
|
||||||
log.Printf("error: dbUpdate samValue failed: %v\n", err)
|
log.Printf("error: dbUpdate samValue failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the message to some process to consume it.
|
// Send the message to some process to consume it.
|
||||||
r.bufData <- v
|
r.bufData <- samV
|
||||||
|
|
||||||
// Increment index, and store the new value to the database.
|
// Increment index, and store the new value to the database.
|
||||||
|
r.mu.Lock()
|
||||||
r.totalMessagesIndex++
|
r.totalMessagesIndex++
|
||||||
fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex)
|
fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex)
|
||||||
|
r.mu.Unlock()
|
||||||
|
|
||||||
fmt.Println("---------------------------------------------------------")
|
fmt.Println("---------------------------------------------------------")
|
||||||
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
|
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(dbID)))
|
||||||
}
|
}
|
||||||
|
|
||||||
// When done close the buffer channel
|
// When done close the buffer channel
|
||||||
|
@ -107,7 +118,7 @@ func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket stri
|
||||||
// one by one. The messages will be delivered on the outCh, and it will wait
|
// one by one. The messages will be delivered on the outCh, and it will wait
|
||||||
// until a signal is received on the done channel before it continues with the
|
// until a signal is received on the done channel before it continues with the
|
||||||
// next message.
|
// next message.
|
||||||
func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan subjectAndMessage) {
|
func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan samDBValue) {
|
||||||
// Range over the buffer of messages to pass on to processes.
|
// Range over the buffer of messages to pass on to processes.
|
||||||
for v := range r.bufData {
|
for v := range r.bufData {
|
||||||
// Create a done channel per message. A process started by the
|
// Create a done channel per message. A process started by the
|
||||||
|
@ -120,16 +131,19 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sub
|
||||||
// it's own go routine. Reason is that we don't want to block other
|
// it's own go routine. Reason is that we don't want to block other
|
||||||
// messages being blocked while waiting for the done signal, or if an
|
// messages being blocked while waiting for the done signal, or if an
|
||||||
// error with an individual message occurs.
|
// error with an individual message occurs.
|
||||||
go func(v subjectAndMessage) {
|
go func(v samDBValue) {
|
||||||
v.Message.done = make(chan struct{})
|
v.Data.Message.done = make(chan struct{})
|
||||||
outCh <- v
|
outCh <- v
|
||||||
|
|
||||||
// ----------TESTING
|
// ----------TESTING
|
||||||
<-v.done
|
<-v.Data.done
|
||||||
fmt.Println("-----------------------------------------------------------")
|
fmt.Println("-----------------------------------------------------------")
|
||||||
fmt.Printf("### DONE WITH THE MESSAGE\n")
|
fmt.Printf("### DONE WITH THE MESSAGE\n")
|
||||||
fmt.Println("-----------------------------------------------------------")
|
fmt.Println("-----------------------------------------------------------")
|
||||||
|
|
||||||
|
r.deleteKeyFromBucket(samValueBucket, strconv.Itoa(v.ID))
|
||||||
|
fmt.Printf("******* DELETED KEY %v FROM BUCKET*******", v.ID)
|
||||||
|
|
||||||
// TODO: Delete the messages here. The SAM handled here, do
|
// TODO: Delete the messages here. The SAM handled here, do
|
||||||
// not contain the totalMessageID, so we might need to change
|
// not contain the totalMessageID, so we might need to change
|
||||||
// the struct we pass around.
|
// the struct we pass around.
|
||||||
|
@ -165,12 +179,20 @@ func (r *ringBuffer) dumpBucket(bucket string) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
||||||
// c := bu.Cursor()
|
return nil
|
||||||
//
|
})
|
||||||
// fmt.Println("--------------------------DUMP---------------------------")
|
|
||||||
// for k, v := c.First(); k != nil; c.Next() {
|
return err
|
||||||
// fmt.Printf("k: %s, v: %v\n", k, v)
|
}
|
||||||
// }
|
|
||||||
|
func (r *ringBuffer) deleteKeyFromBucket(bucket string, key string) error {
|
||||||
|
err := r.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
bu := tx.Bucket([]byte(bucket))
|
||||||
|
|
||||||
|
err := bu.Delete([]byte(key))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue