From 6f525162ea077151a093d630417035565b90311a Mon Sep 17 00:00:00 2001 From: postmannen Date: Sat, 31 Dec 2022 07:51:34 +0100 Subject: [PATCH] renamed data to sam in ringbuffer --- ringbuffer.go | 28 ++++++++++++++-------------- server.go | 2 +- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/ringbuffer.go b/ringbuffer.go index fb8401a..a2e0251 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -28,8 +28,8 @@ import ( // struct type is used when storing and retreiving from // db. type samDBValue struct { - ID int - Data subjectAndMessage + ID int + SAM subjectAndMessage } // ringBuffer holds the data of the buffer, @@ -208,8 +208,8 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage // Create a structure for JSON marshaling. samV := samDBValue{ - ID: dbID, - Data: v, + ID: dbID, + SAM: v, } if r.configuration.RingBufferPersistStore { @@ -254,9 +254,9 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // Range over the buffer of messages to pass on to processes. for { select { - case v := <-r.bufData: + case samDBv := <-r.bufData: r.metrics.promInMemoryBufferMessagesCurrent.Set(float64(len(r.bufData))) - v.Data.ID = v.ID + samDBv.SAM.ID = samDBv.ID // Create a done channel per message. A process started by the // spawnProcess function will handle incomming messages sequentaly. @@ -278,11 +278,11 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // that might be in use in the handler. msgForPermStore := Message{} - copier.Copy(&msgForPermStore, v.Data.Message) + copier.Copy(&msgForPermStore, v.SAM.Message) // Remove the content of the data field. msgForPermStore.Data = nil - v.Data.Message.done = make(chan struct{}) + v.SAM.Message.done = make(chan struct{}) delivredCh := make(chan struct{}) // Prepare the structure with the data, and a function that can @@ -294,7 +294,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB }, } - ticker := time.NewTicker(time.Duration(v.Data.ACKTimeout) * time.Duration(v.Data.Retries) * 2 * time.Second) + ticker := time.NewTicker(time.Duration(v.SAM.ACKTimeout) * time.Duration(v.SAM.Retries) * 2 * time.Second) defer ticker.Stop() outCh <- sd @@ -323,10 +323,10 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // // TODO: Figure out why what the reason for not receceving the done signals might be. select { - case <-v.Data.done: + case <-v.SAM.done: case <-ticker.C: log.Printf("----------------------------------------------\n") - log.Printf("Error: ringBuffer message %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.Data.Subject) + log.Printf("Error: ringBuffer message %v seems to be stuck, did not receive done signal from publishAMessage process, exited on ticker\n", v.SAM.Subject) log.Printf("----------------------------------------------\n") } // log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) @@ -345,7 +345,7 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB } r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n" - }(v) + }(samDBv) case <-ctx.Done(): //close(outCh) return @@ -414,8 +414,8 @@ func (r *ringBuffer) dumpBucket(bucket string) ([]samDBValue, error) { return samDBValues[i].ID > samDBValues[j].ID }) - for _, v := range samDBValues { - log.Printf("info: ringBuffer.dumpBucket: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", v.ID, v.Data.ID, v.Data.Subject, len(v.Data.Data)) + for _, samDBv := range samDBValues { + log.Printf("info: ringBuffer.dumpBucket: k/v store, kvID: %v, message.ID: %v, subject: %v, len(data): %v\n", samDBv.ID, samDBv.SAM.ID, samDBv.SAM.Subject, len(samDBv.SAM.Data)) } return nil diff --git a/server.go b/server.go index 39781be..54d9522 100644 --- a/server.go +++ b/server.go @@ -447,7 +447,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // Signal back to the ringbuffer that message have been picked up. samDBVal.delivered() - sam := samDBVal.samDBValue.Data + sam := samDBVal.samDBValue.SAM // Check if the format of the message is correct. if _, ok := methodsAvailable.CheckIfExists(sam.Message.Method); !ok { er := fmt.Errorf("error: routeMessagesToProcess: the method do not exist, message dropped: %v", sam.Message.Method)