diff --git a/process.go b/process.go index 0bddd8a..be8421d 100644 --- a/process.go +++ b/process.go @@ -413,7 +413,8 @@ func (p process) publishMessages(natsConn *nats.Conn) { case m = <-p.subject.messageCh: case <-p.ctx.Done(): er := fmt.Errorf("info: canceling publisher: %v", p.subject.name()) - sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) + //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) + log.Printf("%v\n", er) return } // Get the process name so we can look up the process in the diff --git a/ringbuffer.go b/ringbuffer.go index 86459e5..e6f80b6 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -216,8 +216,15 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam // Listen on the done channel here , so a go routine handling the // message will be able to signal back here that the message have // been processed, and that we then can delete it out of the K/V Store. - <-v.Data.done - log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) + + select { + case <-v.Data.done: + log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID) + // case <-time.After(time.Second * 3): + // // Testing with a timeout here to figure out if messages are stuck + // // waiting for done signal. + // fmt.Printf(" *** Ingo: message %v seems to be stuck, dropping message\n", v.ID) + } // Since we are now done with the specific message we can delete // it out of the K/V Store. diff --git a/server.go b/server.go index ba66147..c8f9c44 100644 --- a/server.go +++ b/server.go @@ -289,14 +289,14 @@ func (s *server) Start() { // Adding a safety function here so we can make sure that all processes // are stopped after a given time if the context cancelation below hangs. defer func() { - //time.Sleep(time.Second * 5) + time.Sleep(time.Second * 20) log.Printf("error: doing a non graceful shutdown of all processes..\n") os.Exit(1) }() // TODO: The cancelation of all gracefully do not work, so adding a sleep here // to be sure that the defered exit above are run before this cancelFunc. - time.Sleep(time.Second * 2) + time.Sleep(time.Second * 0) s.ctxCancelFunc() fmt.Printf(" *** Done: ctxCancelFunc()\n") @@ -348,6 +348,11 @@ func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage { return sam } +type samDBValueAndDeliveredCh struct { + samDBValue samDBValue + delivered chan struct{} +} + // routeMessagesToProcess takes a database name and an input channel as // it's input arguments. // The database will be used as the persistent store for the work queue diff --git a/startup_processes.go b/startup_processes.go index 6ff3e8c..d9ec6b2 100644 --- a/startup_processes.go +++ b/startup_processes.go @@ -129,8 +129,9 @@ func (s startup) pubREQHello(p process) { select { case <-ticker.C: case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + er := fmt.Errorf("info: stopped handleFunc for: pub%v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) return nil } } @@ -211,8 +212,9 @@ func (s startup) subREQHello(p process) { select { case m = <-proc.procFuncCh: case <-ctx.Done(): - er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name()) - sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + er := fmt.Errorf("info: stopped handleFunc for: sub%v", proc.subject.name()) + // sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) return nil }