1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

stopping started tickers

This commit is contained in:
postmannen 2022-12-26 10:52:43 +01:00
parent 98137b1df1
commit 709bd219f7
3 changed files with 7 additions and 0 deletions

View file

@ -681,6 +681,8 @@ func executeHandler(p process, message Message, thisNode string) {
// Create two tickers to use for the scheduling.
intervalTicker := time.NewTicker(time.Second * time.Duration(interval))
totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime))
defer intervalTicker.Stop()
defer totalTimeTicker.Stop()
// NB: Commented out this assignement of a specific message context
// to be used within handlers, since it will override the structure
@ -827,6 +829,7 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// the process, so the sub process publisher will not be removed until
// it have not received any messages for the given amount of time.
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
defer ticker.Stop()
for {

View file

@ -286,6 +286,7 @@ func (s startup) pubREQHello(p process) {
// Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubREQHello))
defer ticker.Stop()
for {
// d := fmt.Sprintf("Hello from %v\n", p.node)
@ -336,6 +337,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
// Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQKeysRequestUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored keys,
@ -392,6 +394,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
// Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.REQAclRequestUpdateInterval))
defer ticker.Stop()
for {
// Send a message with the hash of the currently stored acl's,

View file

@ -123,6 +123,7 @@ func (r *ringBuffer) start(ctx context.Context, inCh chan subjectAndMessage, out
go func() {
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()
for {
select {