diff --git a/process.go b/process.go index 36beef6..0209ee0 100644 --- a/process.go +++ b/process.go @@ -681,6 +681,8 @@ func executeHandler(p process, message Message, thisNode string) { // Create two tickers to use for the scheduling. intervalTicker := time.NewTicker(time.Second * time.Duration(interval)) totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime)) + defer intervalTicker.Stop() + defer totalTimeTicker.Stop() // NB: Commented out this assignement of a specific message context // to be used within handlers, since it will override the structure @@ -827,6 +829,7 @@ func (p process) publishMessages(natsConn *nats.Conn) { // the process, so the sub process publisher will not be removed until // it have not received any messages for the given amount of time. ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) + defer ticker.Stop() for { diff --git a/processes.go b/processes.go index 1076fec..3329bc3 100644 --- a/processes.go +++ b/processes.go @@ -286,6 +286,7 @@ func (s startup) pubREQHello(p process) { // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubREQHello)) + defer ticker.Stop() for { // d := fmt.Sprintf("Hello from %v\n", p.node) @@ -336,6 +337,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) { // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQKeysRequestUpdateInterval)) + defer ticker.Stop() for { // Send a message with the hash of the currently stored keys, @@ -392,6 +394,7 @@ func (s startup) pubREQAclRequestUpdate(p process) { // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQAclRequestUpdateInterval)) + defer ticker.Stop() for { // Send a message with the hash of the currently stored acl's, diff --git a/ringbuffer.go b/ringbuffer.go index 34ae175..f054b90 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -123,6 +123,7 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out go func() { ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() for { select {