1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-20 22:52:13 +00:00

fixed calling executeHandler() for startup folder messages

This commit is contained in:
postmannen 2022-06-20 13:34:20 +02:00
parent 2d76e06274
commit c8e5dd6904
2 changed files with 84 additions and 72 deletions

View file

@ -38,6 +38,7 @@ func (s *server) readStartupFolder() {
} }
for _, filePath := range filePaths { for _, filePath := range filePaths {
fmt.Printf("DEBUGDEBUGDEBUGDEBUGDEBUGDEBUGDEBUG: %v\n", filePath)
// Read the content of each file. // Read the content of each file.
readBytes, err := func(filePath string) ([]byte, error) { readBytes, err := func(filePath string) ([]byte, error) {
@ -107,12 +108,16 @@ func (s *server) readStartupFolder() {
continue continue
} }
_, err = mh.handler(p, sams[i].Message, s.nodeName) p.handler = mh.handler
if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) //_, err = mh.handler(p, sams[i].Message, s.nodeName)
p.errorKernel.errSend(p, sams[i].Message, er) //if err != nil {
continue // er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
} // p.errorKernel.errSend(p, sams[i].Message, er)
// continue
//}
executeHandler(p, sams[i].Message, s.nodeName)
} }
} }

View file

@ -578,6 +578,29 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// verified, and if OK the handler is called. // verified, and if OK the handler is called.
func (p process) callHandler(message Message, thisNode string) []byte { func (p process) callHandler(message Message, thisNode string) []byte {
//out := []byte{} //out := []byte{}
// Call the handler if ACL/signature checking returns true.
// If the handler is to be called in a scheduled manner, we we take care of that too.
go func() {
switch p.verifySigOrAclFlag(message) {
case true:
executeHandler(p, message, thisNode)
case false:
// ACL/Signature checking failed.
er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing")
p.errorKernel.errSend(p, message, er)
log.Printf("%v\n", er)
}
}()
return []byte{}
}
// executeHandler will call the handler for the Request type defined in the message.
func executeHandler(p process, message Message, thisNode string) {
var err error var err error
// Check if it is a message to run scheduled. // Check if it is a message to run scheduled.
@ -596,41 +619,61 @@ func (p process) callHandler(message Message, thisNode string) []byte {
runAsScheduled = true runAsScheduled = true
} }
// Call the handler if ACL/signature checking returns true. // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler.
// If the handler is to be called in a scheduled manner, we we take care of that too. er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true)
go func() { p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
switch p.verifySigOrAclFlag(message) {
case true: switch {
// Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. case !runAsScheduled:
er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
switch { go func() {
case !runAsScheduled: _, err = p.handler(p, message, thisNode)
if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.errorKernel.errSend(p, message, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
}()
go func() { case runAsScheduled:
_, err = p.handler(p, message, thisNode) // Create two tickers to use for the scheduling.
if err != nil { intervalTicker := time.NewTicker(time.Second * time.Duration(interval))
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime))
p.errorKernel.errSend(p, message, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
}()
case runAsScheduled: // NB: Commented out this assignement of a specific message context
// Create two tickers to use for the scheduling. // to be used within handlers, since it will override the structure
intervalTicker := time.NewTicker(time.Second * time.Duration(interval)) // we have today. Keeping the code for a bit incase it makes sense
totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime)) // to implement later.
//ctx, cancel := context.WithCancel(p.ctx)
//message.ctx = ctx
// NB: Commented out this assignement of a specific message context // Run the handler once, so we don't have to wait for the first ticker.
// to be used within handlers, since it will override the structure go func() {
// we have today. Keeping the code for a bit incase it makes sense _, err := p.handler(p, message, thisNode)
// to implement later. if err != nil {
//ctx, cancel := context.WithCancel(p.ctx) er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
//message.ctx = ctx p.errorKernel.errSend(p, message, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
}()
// Run the handler once, so we don't have to wait for the first ticker. for {
select {
case <-p.ctx.Done():
er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
//cancel()
return
case <-totalTimeTicker.C:
// Total time reached. End the process.
//cancel()
er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
return
case <-intervalTicker.C:
go func() { go func() {
_, err := p.handler(p, message, thisNode) _, err := p.handler(p, message, thisNode)
if err != nil { if err != nil {
@ -639,45 +682,9 @@ func (p process) callHandler(message Message, thisNode string) []byte {
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
} }
}() }()
for {
select {
case <-p.ctx.Done():
er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
//cancel()
return
case <-totalTimeTicker.C:
// Total time reached. End the process.
//cancel()
er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
return
case <-intervalTicker.C:
go func() {
_, err := p.handler(p, message, thisNode)
if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.errorKernel.errSend(p, message, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
}()
}
}
} }
case false:
// ACL/Signature checking failed.
er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing")
p.errorKernel.errSend(p, message, er)
log.Printf("%v\n", er)
} }
}() }
return []byte{}
} }
// verifySigOrAclFlag will do signature and/or acl checking based on which of // verifySigOrAclFlag will do signature and/or acl checking based on which of