1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-15 10:57:42 +00:00

initial scheduler implementation

This commit is contained in:
postmannen 2022-06-20 11:17:23 +02:00
parent 6c900296c9
commit 113481775c
2 changed files with 75 additions and 16 deletions

View file

@ -76,6 +76,8 @@ type Message struct {
// generated and we also need a copy of the details of the the // generated and we also need a copy of the details of the the
// initial request message. // initial request message.
PreviousMessage *Message PreviousMessage *Message
// Schedule
Schedule []int `json:"schedule" yaml:"schedule"`
// done is used to signal when a message is fully processed. // done is used to signal when a message is fully processed.
// This is used for signaling back to the ringbuffer that we are // This is used for signaling back to the ringbuffer that we are

View file

@ -542,12 +542,12 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
//var err error //var err error
out := p.callHandler(message, thisNode) _ = p.callHandler(message, thisNode)
// Send a confirmation message back to the publisher to ACK that the // Send a confirmation message back to the publisher to ACK that the
// message was received by the subscriber. The reply should be sent // message was received by the subscriber. The reply should be sent
//no matter if the handler was executed successfully or not //no matter if the handler was executed successfully or not
natsConn.Publish(msg.Reply, out) natsConn.Publish(msg.Reply, []byte{})
case p.subject.Event == EventNACK: case p.subject.Event == EventNACK:
// When spawning sub processes we can directly assign handlers to the process upon // When spawning sub processes we can directly assign handlers to the process upon
@ -577,25 +577,82 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
// If checking signatures and/or acl's are enabled the signatures they will be // If checking signatures and/or acl's are enabled the signatures they will be
// verified, and if OK the handler is called. // verified, and if OK the handler is called.
func (p process) callHandler(message Message, thisNode string) []byte { func (p process) callHandler(message Message, thisNode string) []byte {
out := []byte{} //out := []byte{}
var err error var err error
switch p.verifySigOrAclFlag(message) { // Check if it is a message to run scheduled.
case true: var interval int
log.Printf("info: subscriberHandler: doHandler=true: %v\n", true) var totalTime int
out, err = p.handler(p, message, thisNode) var runAsScheduled bool
if err != nil { switch {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err) case len(message.Schedule) < 2:
// Not at scheduled message,
case len(message.Schedule) == 2:
interval = message.Schedule[0]
totalTime = message.Schedule[1]
fallthrough
case interval > 0 && totalTime > 0:
runAsScheduled = true
}
// Call the handler if ACL/signature checking returns true.
// If the handler is to be called in a scheduled manner, we we take care of that too.
go func() {
switch p.verifySigOrAclFlag(message) {
case true:
// Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler.
er := fmt.Errorf("info: subscriberHandler: Either ACL were verified OK, or ACL/Signature check was not enabled, so we call the handler: %v", true)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
switch {
case !runAsScheduled:
go func() {
_, err = p.handler(p, message, thisNode)
if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.errorKernel.errSend(p, message, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
}()
case runAsScheduled:
// Create two tickers to use for the scheduling.
intervalTicker := time.NewTicker(time.Second * time.Duration(interval))
totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime))
for {
select {
case <-p.ctx.Done():
return
case <-totalTimeTicker.C:
// Total time reached. End the process and return.
p.ctxCancel()
return
case <-intervalTicker.C:
go func() {
_, err := p.handler(p, message, thisNode)
if err != nil {
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
p.errorKernel.errSend(p, message, er)
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
}
}()
}
}
}
case false:
// ACL/Signature checking failed.
er := fmt.Errorf("error: subscriberHandler: ACL were verified not-OK, doing nothing")
p.errorKernel.errSend(p, message, er) p.errorKernel.errSend(p, message, er)
log.Printf("%v\n", er) log.Printf("%v\n", er)
} }
default: }()
er := fmt.Errorf("error: subscriberHandler: doHandler=false, doing nothing")
p.errorKernel.errSend(p, message, er)
log.Printf("%v\n", er)
}
return out return []byte{}
} }
// verifySigOrAclFlag will do signature and/or acl checking based on which of // verifySigOrAclFlag will do signature and/or acl checking based on which of
@ -610,7 +667,7 @@ func (p process) verifySigOrAclFlag(message Message) bool {
// If no checking enabled we should just allow the message. // If no checking enabled we should just allow the message.
case !p.nodeAuth.configuration.EnableSignatureCheck && !p.nodeAuth.configuration.EnableAclCheck: case !p.nodeAuth.configuration.EnableSignatureCheck && !p.nodeAuth.configuration.EnableAclCheck:
log.Printf(" * DEBUG: verify acl/sig: no acl or signature checking at all is enabled, ALLOW the message, method=%v\n", message.Method) //log.Printf(" * DEBUG: verify acl/sig: no acl or signature checking at all is enabled, ALLOW the message, method=%v\n", message.Method)
doHandler = true doHandler = true
// If only sig check enabled, and sig OK, we should allow the message. // If only sig check enabled, and sig OK, we should allow the message.