1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-07 12:59:15 +00:00

terminating subproc publishers if inactive for given amount of time

This commit is contained in:
postmannen 2022-06-18 00:03:25 +02:00
parent 1dadc0df64
commit b1bcefb949
5 changed files with 96 additions and 33 deletions

View file

@ -90,6 +90,8 @@ type Configuration struct {
// EnableDebug will also enable printing all the messages received in the errorKernel // EnableDebug will also enable printing all the messages received in the errorKernel
// to STDERR. // to STDERR.
EnableDebug bool EnableDebug bool
// KeepPublishersAliveFor number of seconds
KeepPublishersAliveFor int
// Make the current node send hello messages to central at given interval in seconds // Make the current node send hello messages to central at given interval in seconds
StartPubREQHello int StartPubREQHello int
@ -171,6 +173,7 @@ type ConfigurationFromFile struct {
EnableAclCheck *bool EnableAclCheck *bool
IsCentralAuth *bool IsCentralAuth *bool
EnableDebug *bool EnableDebug *bool
KeepPublishersAliveFor *int
StartPubREQHello *int StartPubREQHello *int
EnableKeyUpdates *bool EnableKeyUpdates *bool
@ -236,6 +239,7 @@ func newConfigurationDefaults() Configuration {
EnableAclCheck: false, EnableAclCheck: false,
IsCentralAuth: false, IsCentralAuth: false,
EnableDebug: false, EnableDebug: false,
KeepPublishersAliveFor: 10,
StartPubREQHello: 30, StartPubREQHello: 30,
EnableKeyUpdates: true, EnableKeyUpdates: true,
@ -440,6 +444,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
} else { } else {
conf.EnableDebug = *cf.EnableDebug conf.EnableDebug = *cf.EnableDebug
} }
if cf.KeepPublishersAliveFor == nil {
conf.KeepPublishersAliveFor = cd.KeepPublishersAliveFor
} else {
conf.KeepPublishersAliveFor = *cf.KeepPublishersAliveFor
}
// --- Start pub/sub // --- Start pub/sub
@ -602,6 +611,7 @@ func (c *Configuration) CheckFlags() error {
flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", fc.EnableAclCheck, "true/false *TESTING* enable Acl checking.") flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", fc.EnableAclCheck, "true/false *TESTING* enable Acl checking.")
flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server") flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server")
flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR")
flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", fc.KeepPublishersAliveFor, "The amount of time we allow a publisher to stay alive without receiving any messages to publish")
// Start of Request publishers/subscribers // Start of Request publishers/subscribers

View file

@ -51,6 +51,8 @@ type Message struct {
ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"` ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"`
// RetryWait specified the time in seconds to wait between retries. // RetryWait specified the time in seconds to wait between retries.
RetryWait int `json:"retryWait" yaml:"retryWait"` RetryWait int `json:"retryWait" yaml:"retryWait"`
// IsSubPublishedMsg enables timeout of publishing process, and is used together with process.isSubProcess to be able to terminate the sub processes publishers.
IsSubPublishedMsg bool `json:"isSubPublishedMsg" yaml:"isSubPublishedMsg"`
// Resend retries // Resend retries
Retries int `json:"retries" yaml:"retries"` Retries int `json:"retries" yaml:"retries"`
// The ACK timeout of the new message created via a request event. // The ACK timeout of the new message created via a request event.

View file

@ -34,6 +34,8 @@ const (
// process holds all the logic to handle a message type and it's // process holds all the logic to handle a message type and it's
// method, subscription/publishin messages for a subject, and more. // method, subscription/publishin messages for a subject, and more.
type process struct { type process struct {
// isSubProcess is used to indentify subprocesses spawned by other processes.
isSubProcess bool
// server // server
server *server server *server
// messageID // messageID
@ -687,12 +689,30 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// fails in the loop we should throw an error and use `continue` // fails in the loop we should throw an error and use `continue`
// to jump back here to the beginning of the loop and continue // to jump back here to the beginning of the loop and continue
// with the next message. // with the next message.
// Adding a timer that will be used for when to remove the sub process
// publisher. The timer is reset each time a message is published with
// the process, so the sub process publisher will not be removed until
// it have not received any messages for the given amount of time.
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
for { for {
// Wait and read the next message on the message channel, or // Wait and read the next message on the message channel, or
// exit this function if Cancel are received via ctx. // exit this function if Cancel are received via ctx.
select { select {
case <-ticker.C:
// We only want to remove subprocesses
if p.isSubProcess {
p.ctxCancel()
p.processes.active.mu.Lock()
delete(p.processes.active.procNames, p.processName)
p.processes.active.mu.Unlock()
}
case m := <-p.subject.messageCh: case m := <-p.subject.messageCh:
ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor))
// Sign the methodArgs, and add the signature to the message. // Sign the methodArgs, and add the signature to the message.
m.ArgSignature = p.addMethodArgSignature(m) m.ArgSignature = p.addMethodArgSignature(m)
// fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature))
@ -821,6 +841,16 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once,
return return
} }
// NB: vvvvvvvvvvvvvvvvvvvvv-THIS DOES NOT WORK FOR CANCELING THE PUBLISHER-vvvvvvvvv
//if m.IsSubPublishedMsg {
// p.ctxCancel()
// go func() {
// p.processes.active.mu.Lock()
// delete(p.processes.active.procNames, p.processName)
// p.processes.active.mu.Unlock()
// }()
//}
// Increment the counter for the next message to be sent. // Increment the counter for the next message to be sent.
p.messageID++ p.messageID++

View file

@ -183,7 +183,8 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
sub := newSubjectNoVerifyHandler(m, node) sub := newSubjectNoVerifyHandler(m, node)
// Create a new sub process that will do the actual file copying. // Create a new sub process that will do the actual file copying.
copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
@ -233,6 +234,14 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
return ackMsg, nil return ackMsg, nil
} }
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process {
p := newProcess(ctx, server, subject, processKind, procFunc)
p.isSubProcess = true
return p
}
// ---- // ----
type methodREQCopyDst struct { type methodREQCopyDst struct {
@ -275,7 +284,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([
sub := newSubjectNoVerifyHandler(cia.DstMethod, node) sub := newSubjectNoVerifyHandler(cia.DstMethod, node)
// Create a new sub process that will do the actual file copying. // Create a new sub process that will do the actual file copying.
copyDstSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
@ -428,6 +437,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
Method: cia.DstMethod, Method: cia.DstMethod,
ReplyMethod: REQNone, ReplyMethod: REQNone,
Data: csaSerialized, Data: csaSerialized,
IsSubPublishedMsg: true,
} }
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
@ -484,6 +494,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
Method: cia.DstMethod, Method: cia.DstMethod,
ReplyMethod: REQNone, ReplyMethod: REQNone,
Data: csaSerialized, Data: csaSerialized,
IsSubPublishedMsg: true,
} }
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
@ -536,6 +547,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
Method: cia.SrcMethod, Method: cia.SrcMethod,
ReplyMethod: REQNone, ReplyMethod: REQNone,
Data: csaSerialized, Data: csaSerialized,
IsSubPublishedMsg: true,
} }
fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
@ -612,6 +624,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
Method: cia.SrcMethod, Method: cia.SrcMethod,
ReplyMethod: REQNone, ReplyMethod: REQNone,
Data: csaSer, Data: csaSer,
IsSubPublishedMsg: true,
} }
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)
@ -638,6 +651,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
Method: cia.SrcMethod, Method: cia.SrcMethod,
ReplyMethod: REQNone, ReplyMethod: REQNone,
Data: csaSer, Data: csaSer,
IsSubPublishedMsg: true,
} }
sam, err := newSubjectAndMessage(msg) sam, err := newSubjectAndMessage(msg)
@ -771,6 +785,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
Method: cia.SrcMethod, Method: cia.SrcMethod,
ReplyMethod: REQNone, ReplyMethod: REQNone,
Data: csaSerialized, Data: csaSerialized,
IsSubPublishedMsg: true,
} }
// fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod)

View file

@ -483,7 +483,13 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) // log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
proc := newProcess(s.ctx, s, sub, processKindPublisher, nil) var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
}
proc.spawnWorker() proc.spawnWorker()
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID) // log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)