1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

Removed some errors via messages

This commit is contained in:
postmannen 2021-07-02 18:32:01 +02:00
parent f4c501d3c5
commit 7791899e3d
4 changed files with 24 additions and 9 deletions

View file

@ -413,7 +413,8 @@ func (p process) publishMessages(natsConn *nats.Conn) {
case m = <-p.subject.messageCh:
case <-p.ctx.Done():
er := fmt.Errorf("info: canceling publisher: %v", p.subject.name())
sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
log.Printf("%v\n", er)
return
}
// Get the process name so we can look up the process in the

View file

@ -216,8 +216,15 @@ func (r *ringBuffer) processBufferMessages(samValueBucket string, outCh chan sam
// Listen on the done channel here , so a go routine handling the
// message will be able to signal back here that the message have
// been processed, and that we then can delete it out of the K/V Store.
<-v.Data.done
select {
case <-v.Data.done:
log.Printf("info: processBufferMessages: done with message, deleting key from bucket, %v\n", v.ID)
// case <-time.After(time.Second * 3):
// // Testing with a timeout here to figure out if messages are stuck
// // waiting for done signal.
// fmt.Printf(" *** Ingo: message %v seems to be stuck, dropping message\n", v.ID)
}
// Since we are now done with the specific message we can delete
// it out of the K/V Store.

View file

@ -289,14 +289,14 @@ func (s *server) Start() {
// Adding a safety function here so we can make sure that all processes
// are stopped after a given time if the context cancelation below hangs.
defer func() {
//time.Sleep(time.Second * 5)
time.Sleep(time.Second * 20)
log.Printf("error: doing a non graceful shutdown of all processes..\n")
os.Exit(1)
}()
// TODO: The cancelation of all gracefully do not work, so adding a sleep here
// to be sure that the defered exit above are run before this cancelFunc.
time.Sleep(time.Second * 2)
time.Sleep(time.Second * 0)
s.ctxCancelFunc()
fmt.Printf(" *** Done: ctxCancelFunc()\n")
@ -348,6 +348,11 @@ func createErrorMsgContent(FromNode Node, theError error) subjectAndMessage {
return sam
}
type samDBValueAndDeliveredCh struct {
samDBValue samDBValue
delivered chan struct{}
}
// routeMessagesToProcess takes a database name and an input channel as
// it's input arguments.
// The database will be used as the persistent store for the work queue

View file

@ -129,8 +129,9 @@ func (s startup) pubREQHello(p process) {
select {
case <-ticker.C:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name())
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
er := fmt.Errorf("info: stopped handleFunc for: pub%v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil
}
}
@ -211,8 +212,9 @@ func (s startup) subREQHello(p process) {
select {
case m = <-proc.procFuncCh:
case <-ctx.Done():
er := fmt.Errorf("info: stopped handleFunc for: %v", proc.subject.name())
sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
er := fmt.Errorf("info: stopped handleFunc for: sub%v", proc.subject.name())
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
log.Printf("%v\n", er)
return nil
}