mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
schedule all request types seems to initially work
This commit is contained in:
parent
113481775c
commit
87b0d71a25
2 changed files with 35 additions and 2 deletions
|
@ -84,6 +84,14 @@ type Message struct {
|
|||
// done with processing a message, and the message can be removed
|
||||
// from the ringbuffer and into the time series log.
|
||||
done chan struct{}
|
||||
|
||||
// ctx for the specifix message. Used for for example canceling
|
||||
// scheduled messages.
|
||||
// NB: Commented out this field for specific message context
|
||||
// to be used within handlers, since it will override the structure
|
||||
// we have today. Keeping the code for a bit incase it makes sense
|
||||
// to implement later.
|
||||
//ctx context.Context
|
||||
}
|
||||
|
||||
// --- Subject
|
||||
|
|
29
process.go
29
process.go
|
@ -623,14 +623,39 @@ func (p process) callHandler(message Message, thisNode string) []byte {
|
|||
intervalTicker := time.NewTicker(time.Second * time.Duration(interval))
|
||||
totalTimeTicker := time.NewTicker(time.Second * time.Duration(totalTime))
|
||||
|
||||
// NB: Commented out this assignement of a specific message context
|
||||
// to be used within handlers, since it will override the structure
|
||||
// we have today. Keeping the code for a bit incase it makes sense
|
||||
// to implement later.
|
||||
//ctx, cancel := context.WithCancel(p.ctx)
|
||||
//message.ctx = ctx
|
||||
|
||||
// Run the handler once, so we don't have to wait for the first ticker.
|
||||
go func() {
|
||||
_, err := p.handler(p, message, thisNode)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: subscriberHandler: handler method failed: %v", err)
|
||||
p.errorKernel.errSend(p, message, er)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.ctx.Done():
|
||||
er := fmt.Errorf("info: subscriberHandler: proc ctx done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
|
||||
//cancel()
|
||||
return
|
||||
case <-totalTimeTicker.C:
|
||||
// Total time reached. End the process and return.
|
||||
p.ctxCancel()
|
||||
// Total time reached. End the process.
|
||||
//cancel()
|
||||
er := fmt.Errorf("info: subscriberHandler: schedule totalTime done: toNode=%v, fromNode=%v, method=%v, methodArgs=%v", message.ToNode, message.FromNode, message.Method, message.MethodArgs)
|
||||
p.errorKernel.logConsoleOnlyIfDebug(er, p.configuration)
|
||||
|
||||
return
|
||||
|
||||
case <-intervalTicker.C:
|
||||
go func() {
|
||||
_, err := p.handler(p, message, thisNode)
|
||||
|
|
Loading…
Reference in a new issue