From 113481775c1834f1a8304f6c62f5b2e700901810 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 20 Jun 2022 11:17:23 +0200 Subject: [PATCH] initial scheduler implementation --- message_and_subject.go | 2 + process.go | 89 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 75 insertions(+), 16 deletions(-) diff --git a/message_and_subject.go b/message_and_subject.go index cf70084..9de775a 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -76,6 +76,8 @@ type Message struct { // generated and we also need a copy of the details of the the // initial request message. PreviousMessage *Message + // Schedule + Schedule []int `json:"schedule" yaml:"schedule"` // done is used to signal when a message is fully processed. // This is used for signaling back to the ringbuffer that we are diff --git a/process.go b/process.go index 3558e45..9ef7f0f 100644 --- a/process.go +++ b/process.go @@ -542,12 +542,12 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, //var err error - out := p.callHandler(message, thisNode) + _ = p.callHandler(message, thisNode) // Send a confirmation message back to the publisher to ACK that the // message was received by the subscriber. The reply should be sent //no matter if the handler was executed successfully or not - natsConn.Publish(msg.Reply, out) + natsConn.Publish(msg.Reply, []byte{}) case p.subject.Event == EventNACK: // When spawning sub processes we can directly assign handlers to the process upon @@ -577,25 +577,82 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string, // If checking signatures and/or acl's are enabled the signatures they will be // verified, and if OK the handler is called. func (p process) callHandler(message Message, thisNode string) []byte { - out := []byte{} + //out := []byte{} var err error - switch p.verifySigOrAclFlag(message) { - case true: - log.Printf("info: subscriberHandler: doHandler=true: %v\n", true) - out, err = p.handler(p, message, thisNode) - if err != nil { - er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + // Check if it is a message to run scheduled. + var interval int + var totalTime int + var runAsScheduled bool + switch { + case len(message.Schedule) < 2: + // Not at scheduled message, + case len(message.Schedule) == 2: + interval = message.Schedule[0] + totalTime = message.Schedule[1] + fallthrough + + case interval > 0 && totalTime > 0: + runAsScheduled = true + } + + // Call the handler if ACL/signature checking returns true. + // If the handler is to be called in a scheduled manner, we we take care of that too. + go func() { + switch p.verifySigOrAclFlag(message) { + + case true: + // Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler. + er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + + switch { + case !runAsScheduled: + + go func() { + _, err = p.handler(p, message, thisNode) + if err != nil { + er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + p.errorKernel.errSend(p, message, er) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + } + }() + + case runAsScheduled: + // Create two tickers to use for the scheduling. + intervalTicker := time.NewTicker(time.Second * time.Duration(interval)) + totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime)) + + for { + select { + case <-p.ctx.Done(): + return + case <-totalTimeTicker.C: + // Total time reached. End the process and return. + p.ctxCancel() + return + case <-intervalTicker.C: + go func() { + _, err := p.handler(p, message, thisNode) + if err != nil { + er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) + p.errorKernel.errSend(p, message, er) + p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) + } + }() + } + } + } + + case false: + // ACL/Signature checking failed. + er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing") p.errorKernel.errSend(p, message, er) log.Printf("%v\n", er) } - default: - er := fmt.Errorf("error: subscriberHandler: doHandler=false, doing nothing") - p.errorKernel.errSend(p, message, er) - log.Printf("%v\n", er) - } + }() - return out + return []byte{} } // verifySigOrAclFlag will do signature and/or acl checking based on which of @@ -610,7 +667,7 @@ func (p process) verifySigOrAclFlag(message Message) bool { // If no checking enabled we should just allow the message. case !p.nodeAuth.configuration.EnableSignatureCheck && !p.nodeAuth.configuration.EnableAclCheck: - log.Printf(" * DEBUG: verify acl/sig: no acl or signature checking at all is enabled, ALLOW the message, method=%v\n", message.Method) + //log.Printf(" * DEBUG: verify acl/sig: no acl or signature checking at all is enabled, ALLOW the message, method=%v\n", message.Method) doHandler = true // If only sig check enabled, and sig OK, we should allow the message.