mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-20 22:52:13 +00:00
refactored calling the handler for subscribers
This commit is contained in:
parent
2b2064c3cb
commit
592425d53c
1 changed files with 84 additions and 82 deletions
108
process.go
108
process.go
|
@ -520,9 +520,63 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
||||||
p.errorKernel.errSend(p, message, er)
|
p.errorKernel.errSend(p, message, er)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//var err error
|
||||||
|
|
||||||
|
out := p.callHandler(message, mh, thisNode)
|
||||||
|
|
||||||
|
// Send a confirmation message back to the publisher to ACK that the
|
||||||
|
// message was received by the subscriber. The reply should be sent
|
||||||
|
//no matter if the handler was executed successfully or not
|
||||||
|
natsConn.Publish(msg.Reply, out)
|
||||||
|
|
||||||
|
case p.subject.Event == EventNACK:
|
||||||
|
mh, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
||||||
|
if !ok {
|
||||||
|
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
|
||||||
|
p.errorKernel.errSend(p, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We do not send reply messages for EventNACL, so we can discard the output.
|
||||||
|
_ = p.callHandler(message, mh, thisNode)
|
||||||
|
|
||||||
|
default:
|
||||||
|
er := fmt.Errorf("info: did not find that specific type of event: %#v", p.subject.Event)
|
||||||
|
p.errorKernel.infoSend(p, message, er)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// callHandler will call the handler for the Request type defined in the message.
|
||||||
|
// If checking signatures and/or acl's are enabled the signatures they will be
|
||||||
|
// verified, and if OK the handler is called.
|
||||||
|
func (p process) callHandler(message Message, mh methodHandler, thisNode string) []byte {
|
||||||
out := []byte{}
|
out := []byte{}
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
switch p.verifySigOrAclFlag(message) {
|
||||||
|
case true:
|
||||||
|
log.Printf("info: subscriberHandler: doHandler=true: %v\n", true)
|
||||||
|
out, err = mh.handler(p, message, thisNode)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||||
|
p.errorKernel.errSend(p, message, er)
|
||||||
|
log.Printf("%v\n", er)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
er := fmt.Errorf("error: subscriberHandler: doHandler=false, doing nothing")
|
||||||
|
p.errorKernel.errSend(p, message, er)
|
||||||
|
log.Printf("%v\n", er)
|
||||||
|
}
|
||||||
|
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// verifySigOrAclFlag will do signature and/or acl checking based on which of
|
||||||
|
// those features are enabled, and then call the handler.
|
||||||
|
// The handler will also be called if neither signature or acl checking is enabled
|
||||||
|
// since it is up to the subscriber to decide if it want to use the auth features
|
||||||
|
// or not.
|
||||||
|
func (p process) verifySigOrAclFlag(message Message) bool {
|
||||||
doHandler := false
|
doHandler := false
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
|
@ -565,59 +619,7 @@ func (p process) messageSubscriberHandler(natsConn *nats.Conn, thisNode string,
|
||||||
log.Printf(" * DEBUG: verify acl/sig: None of the verify flags matched, not doing handler for message\n")
|
log.Printf(" * DEBUG: verify acl/sig: None of the verify flags matched, not doing handler for message\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
return doHandler
|
||||||
case doHandler:
|
|
||||||
log.Printf("info: subscriberHandler: doHandler=true: %v\n", doHandler)
|
|
||||||
out, err = mh.handler(p, message, thisNode)
|
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
|
||||||
p.errorKernel.errSend(p, message, er)
|
|
||||||
log.Printf("%v\n", er)
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
er := fmt.Errorf("error: subscriberHandler: doHandler=false, doing nothing")
|
|
||||||
p.errorKernel.errSend(p, message, er)
|
|
||||||
log.Printf("%v\n", er)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send a confirmation message back to the publisher
|
|
||||||
natsConn.Publish(msg.Reply, out)
|
|
||||||
|
|
||||||
// Check for NACK type Event.
|
|
||||||
case p.subject.Event == EventNACK:
|
|
||||||
mf, ok := p.methodsAvailable.CheckIfExists(message.Method)
|
|
||||||
if !ok {
|
|
||||||
er := fmt.Errorf("error: subscriberHandler: method type not available: %v", p.subject.Event)
|
|
||||||
p.errorKernel.errSend(p, message, er)
|
|
||||||
}
|
|
||||||
|
|
||||||
// The verify functions will return true if the signature or acl is correct,
|
|
||||||
// but will return false if the signature or acl is wrong.
|
|
||||||
// They will also return true if the configuration flag for checking the acl
|
|
||||||
// or the signature is set to false.
|
|
||||||
sigOK := p.nodeAuth.verifySignature(message)
|
|
||||||
aclOK := p.nodeAuth.verifyAcl(message)
|
|
||||||
|
|
||||||
// We should allow to call the handler if either:
|
|
||||||
// 1. signature and acl is OK for when acl verification is enabled.
|
|
||||||
// 2. just signature is OK, if just signature checking is enabled.
|
|
||||||
// 3. Since the verify functions return true if the verification is
|
|
||||||
// disabled, the handler will be called if both are disabled.
|
|
||||||
if sigOK && aclOK || sigOK {
|
|
||||||
|
|
||||||
_, err := mf.handler(p, message, thisNode)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
|
||||||
p.errorKernel.errSend(p, message, er)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
default:
|
|
||||||
er := fmt.Errorf("info: did not find that specific type of event: %#v", p.subject.Event)
|
|
||||||
p.errorKernel.infoSend(p, message, er)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubscribeMessage will register the Nats callback function for the specified
|
// SubscribeMessage will register the Nats callback function for the specified
|
||||||
|
|
Loading…
Add table
Reference in a new issue