From c8e5dd690453719ee70b7e6984bcea8ccefeb24f Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 20 Jun 2022 13:34:20 +0200 Subject: [PATCH] fixed calling executeHandler() for startup folder messages --- message_readers.go | 17 ++++-- process.go | 139 ++++++++++++++++++++++++--------------------- 2 files changed, 84 insertions(+), 72 deletions(-) diff --git a/message_readers.go b/message_readers.go index 931e946..aa34b59 100644 --- a/message_readers.go +++ b/message_readers.go @@ -38,6 +38,7 @@ func (s *server) readStartupFolder() { } for _, filePath := range filePaths { + fmt.Printf("DEBUGDEBUGDEBUGDEBUGDEBUGDEBUGDEBUG: %v\n", filePath) // Read the content of each file. readBytes, err := func(filePath string) ([]byte, error) { @@ -107,12 +108,16 @@ func (s *server) readStartupFolder() { continue } - _, err = mh.handler(p, sams[i].Message, s.nodeName) - if err != nil { - er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.errorKernel.errSend(p, sams[i].Message, er) - continue - } + p.handler = mh.handler + + //_, err = mh.handler(p, sams[i].Message, s.nodeName) + //if err != nil { + // er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + // p.errorKernel.errSend(p, sams[i].Message, er) + // continue + //} + + executeHandler(p, sams[i].Message, s.nodeName) } } diff --git a/process.go b/process.go index 6982a42..ba9c9ef 100644 --- a/process.go +++ b/process.go @@ -578,6 +578,29 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // verified, and if OK the handler is called. func (p process) callHandler(message Message, thisNode string) []byte { //out := []byte{} + + // Call the handler if ACL/signature checking returns true. + // If the handler is to be called in a scheduled manner, we we take care of that too. + go func() { + switch p.verifySigOrAclFlag(message) { + + case true: + + executeHandler(p, message, thisNode) + + case false: + // ACL/Signature checking failed. + er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing") + p.errorKernel.errSend(p, message, er) + log.Printf("%v\n", er) + } + }() + + return []byte{} +} + +// executeHandler will call the handler for the Request type defined in the message. +func executeHandler(p process, message Message, thisNode string) { var err error // Check if it is a message to run scheduled. @@ -596,41 +619,61 @@ func (p process) callHandler(message Message, thisNode string) []byte { runAsScheduled = true } - // Call the handler if ACL/signature checking returns true. - // If the handler is to be called in a scheduled manner, we we take care of that too. - go func() { - switch p.verifySigOrAclFlag(message) { + // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. + er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - case true: - // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. - er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + switch { + case !runAsScheduled: - switch { - case !runAsScheduled: + go func() { + _, err = p.handler(p, message, thisNode) + if err != nil { + er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + p.errorKernel.errSend(p, message, er) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + } + }() - go func() { - _, err = p.handler(p, message, thisNode) - if err != nil { - er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.errorKernel.errSend(p, message, er) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - } - }() + case runAsScheduled: + // Create two tickers to use for the scheduling. + intervalTicker := time.NewTicker(time.Second * time.Duration(interval)) + totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime)) - case runAsScheduled: - // Create two tickers to use for the scheduling. - intervalTicker := time.NewTicker(time.Second * time.Duration(interval)) - totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime)) + // NB: Commented out this assignement of a specific message context + // to be used within handlers, since it will override the structure + // we have today. Keeping the code for a bit incase it makes sense + // to implement later. + //ctx, cancel := context.WithCancel(p.ctx) + //message.ctx = ctx - // NB: Commented out this assignement of a specific message context - // to be used within handlers, since it will override the structure - // we have today. Keeping the code for a bit incase it makes sense - // to implement later. - //ctx, cancel := context.WithCancel(p.ctx) - //message.ctx = ctx + // Run the handler once, so we don't have to wait for the first ticker. + go func() { + _, err := p.handler(p, message, thisNode) + if err != nil { + er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + p.errorKernel.errSend(p, message, er) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + } + }() - // Run the handler once, so we don't have to wait for the first ticker. + for { + select { + case <-p.ctx.Done(): + er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + //cancel() + return + case <-totalTimeTicker.C: + // Total time reached. End the process. + //cancel() + er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + return + + case <-intervalTicker.C: go func() { _, err := p.handler(p, message, thisNode) if err != nil { @@ -639,45 +682,9 @@ func (p process) callHandler(message Message, thisNode string) []byte { p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) } }() - - for { - select { - case <-p.ctx.Done(): - er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - //cancel() - return - case <-totalTimeTicker.C: - // Total time reached. End the process. - //cancel() - er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - - return - - case <-intervalTicker.C: - go func() { - _, err := p.handler(p, message, thisNode) - if err != nil { - er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) - p.errorKernel.errSend(p, message, er) - p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - } - }() - } - } } - - case false: - // ACL/Signature checking failed. - er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing") - p.errorKernel.errSend(p, message, er) - log.Printf("%v\n", er) } - }() - - return []byte{} + } } // verifySigOrAclFlag will do signature and/or acl checking based on which of