1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

split the message processing into it's own functions, and handle each message concurrently within the buffer

This commit is contained in:
postmannen 2021-02-16 12:59:37 +01:00
parent f7cbbe3115
commit a5b8df9ea6
2 changed files with 59 additions and 44 deletions

Binary file not shown.

View file

@ -58,54 +58,69 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMes
r.totalMessagesIndex = r.getIndexValue(indexValueBucket)
// Fill the buffer when new data arrives
go func() {
// Check for incomming messages. These are typically comming from
// the go routine who reads inmsg.txt.
for v := range inCh {
// --- Store the incomming message in the k/v store ---
go r.fillBuffer(inCh, samValueBucket, indexValueBucket)
// Create a structure for JSON marshaling.
samV := samDBValue{
ID: r.totalMessagesIndex,
Data: v,
}
go r.processBufferMessages(samValueBucket, outCh)
}
js, err := json.Marshal(samV)
if err != nil {
log.Printf("error: gob encoding samValue: %v\n", err)
}
// fillBuffer will fill the buffer in the ringbuffer reading from the inchannel.
// It will also store the messages in a K/V DB while being processed.
func (r *ringBuffer) fillBuffer(inCh chan subjectAndMessage, samValueBucket string, indexValueBucket string) {
// Check for incomming messages. These are typically comming from
// the go routine who reads inmsg.txt.
for v := range inCh {
// --- Store the incomming message in the k/v store ---
// Store the incomming message in key/value store
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(r.totalMessagesIndex), js)
if err != nil {
// TODO: Handle error
log.Printf("error: dbUpdate samValue failed: %v\n", err)
}
// Send the message to some process to consume it.
r.bufData <- v
// Increment index, and store the new value to the database.
r.totalMessagesIndex++
fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex)
fmt.Println("---------------------------------------------------------")
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
// Create a structure for JSON marshaling.
samV := samDBValue{
ID: r.totalMessagesIndex,
Data: v,
}
// When done close the buffer channel
close(r.bufData)
}()
js, err := json.Marshal(samV)
if err != nil {
log.Printf("error: gob encoding samValue: %v\n", err)
}
// Empty the buffer when data asked for
go func() {
// Range over the buffer of messages to pass on to processes.
for v := range r.bufData {
// Create a done channel per message. A process started by the
// spawnProcess function will handle incomming messages sequentaly.
// So in the spawnProcess function we put a struct{} value when a
// message is processed on the "done" channel and an ack is received
// for a message, and we wait here for the "done" to be received.
// Store the incomming message in key/value store
err = r.dbUpdate(r.db, samValueBucket, strconv.Itoa(r.totalMessagesIndex), js)
if err != nil {
// TODO: Handle error
log.Printf("error: dbUpdate samValue failed: %v\n", err)
}
// Send the message to some process to consume it.
r.bufData <- v
// Increment index, and store the new value to the database.
r.totalMessagesIndex++
fmt.Printf("*** NEXT INDEX NUMBER INCREMENTED: %v\n", r.totalMessagesIndex)
fmt.Println("---------------------------------------------------------")
r.dbUpdate(r.db, indexValueBucket, "index", []byte(strconv.Itoa(r.totalMessagesIndex)))
}
// When done close the buffer channel
close(r.bufData)
}
// processBufferMessages will pick messages from the buffer, and process them
// one by one. The messages will be delivered on the outCh, and it will wait
// until a signal is received on the done channel before it continues with the
// next message.
func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan subjectAndMessage) {
// Range over the buffer of messages to pass on to processes.
for v := range r.bufData {
// Create a done channel per message. A process started by the
// spawnProcess function will handle incomming messages sequentaly.
// So in the spawnProcess function we put a struct{} value when a
// message is processed on the "done" channel and an ack is received
// for a message, and we wait here for the "done" to be received.
// We start the actual processing of an individual message here within
// it's own go routine. Reason is that we don't want to block other
// messages being blocked while waiting for the done signal, or if an
// error with an individual message occurs.
go func(v subjectAndMessage) {
v.Message.done = make(chan struct{})
outCh <- v
@ -128,11 +143,11 @@ func (r *ringBuffer) start(inCh chan subjectAndMessage, outCh chan subjectAndMes
if err != nil {
fmt.Printf("* Error: dump of db failed: %v\n", err)
}
}(v)
}
}
close(outCh)
}()
close(outCh)
}
func (r *ringBuffer) dumpBucket(bucket string) error {