From 1523ae84c3e283320092c6a68a48ba4200542717 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 5 Jul 2021 07:43:33 +0200 Subject: [PATCH] replaced struct{} with callback func for testing --- ringbuffer.go | 9 +++++++-- server.go | 9 ++++++--- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/ringbuffer.go b/ringbuffer.go index a5f1d2b..656ab3f 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -211,10 +211,15 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam // error with an individual message occurs. go func(v samDBValue) { v.Data.Message.done = make(chan struct{}) + delivredCh := make(chan struct{}) + // Prepare the structure with the data, and a function that can + // be called when the data is received for signaling back. sd := samDBValueAndDelivered{ samDBValue: v, - delivered: make(chan struct{}), + delivered: func() { + delivredCh <- struct{}{} + }, } outCh <- sd @@ -222,7 +227,7 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam // the read process have stalled or not. // For now it will not do anything, select { - case <-sd.delivered: + case <-delivredCh: // OK. case <-time.After(time.Second * 5): // Testing with a timeout here to figure out if messages are stuck diff --git a/server.go b/server.go index 43e74de..ec24211 100644 --- a/server.go +++ b/server.go @@ -288,7 +288,7 @@ func (s *server) Start() { // Adding a safety function here so we can make sure that all processes // are stopped after a given time if the context cancelation below hangs. - defer func() { + func() { time.Sleep(time.Second * 0) log.Printf("error: doing a non graceful shutdown of all processes..\n") os.Exit(1) @@ -354,9 +354,12 @@ func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage { return sam } +// Contains the sam value as it is used in the state DB, and also a +// delivered function to be called when this message is picked up, so +// we can control if messages gets stale at some point. type samDBValueAndDelivered struct { samDBValue samDBValue - delivered chan struct{} + delivered func() } // routeMessagesToProcess takes a database name and an input channel as @@ -401,7 +404,7 @@ func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subject go func() { for samTmp := range ringBufferOutCh { - samTmp.delivered <- struct{}{} + samTmp.delivered() sam := samTmp.samDBValue.Data // Check if the format of the message is correct.