diff --git a/configuration_flags.go b/configuration_flags.go index 161aee0..4902d8b 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -90,6 +90,8 @@ type Configuration struct { // EnableDebug will also enable printing all the messages received in the errorKernel // to STDERR. EnableDebug bool + // KeepPublishersAliveFor number of seconds + KeepPublishersAliveFor int // Make the current node send hello messages to central at given interval in seconds StartPubREQHello int @@ -171,6 +173,7 @@ type ConfigurationFromFile struct { EnableAclCheck *bool IsCentralAuth *bool EnableDebug *bool + KeepPublishersAliveFor *int StartPubREQHello *int EnableKeyUpdates *bool @@ -236,6 +239,7 @@ func newConfigurationDefaults() Configuration { EnableAclCheck: false, IsCentralAuth: false, EnableDebug: false, + KeepPublishersAliveFor: 10, StartPubREQHello: 30, EnableKeyUpdates: true, @@ -440,6 +444,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.EnableDebug = *cf.EnableDebug } + if cf.KeepPublishersAliveFor == nil { + conf.KeepPublishersAliveFor = cd.KeepPublishersAliveFor + } else { + conf.KeepPublishersAliveFor = *cf.KeepPublishersAliveFor + } // --- Start pub/sub @@ -602,6 +611,7 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", fc.EnableAclCheck, "true/false *TESTING* enable Acl checking.") flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", fc.IsCentralAuth, "true/false, *TESTING* is this the central auth server") flag.BoolVar(&c.EnableDebug, "enableDebug", fc.EnableDebug, "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") + flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", fc.KeepPublishersAliveFor, "The amount of time we allow a publisher to stay alive without receiving any messages to publish") // Start of Request publishers/subscribers diff --git a/message_and_subject.go b/message_and_subject.go index ddc6803..cf70084 100644 --- a/message_and_subject.go +++ b/message_and_subject.go @@ -51,6 +51,8 @@ type Message struct { ACKTimeout int `json:"ACKTimeout" yaml:"ACKTimeout"` // RetryWait specified the time in seconds to wait between retries. RetryWait int `json:"retryWait" yaml:"retryWait"` + // IsSubPublishedMsg enables timeout of publishing process, and is used together with process.isSubProcess to be able to terminate the sub processes publishers. + IsSubPublishedMsg bool `json:"isSubPublishedMsg" yaml:"isSubPublishedMsg"` // Resend retries Retries int `json:"retries" yaml:"retries"` // The ACK timeout of the new message created via a request event. diff --git a/process.go b/process.go index 4862ac6..d59cada 100644 --- a/process.go +++ b/process.go @@ -34,6 +34,8 @@ const ( // process holds all the logic to handle a message type and it's // method, subscription/publishin messages for a subject, and more. type process struct { + // isSubProcess is used to indentify subprocesses spawned by other processes. + isSubProcess bool // server server *server // messageID @@ -687,12 +689,30 @@ func (p process) publishMessages(natsConn *nats.Conn) { // fails in the loop we should throw an error and use `continue` // to jump back here to the beginning of the loop and continue // with the next message. + + // Adding a timer that will be used for when to remove the sub process + // publisher. The timer is reset each time a message is published with + // the process, so the sub process publisher will not be removed until + // it have not received any messages for the given amount of time. + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) + for { // Wait and read the next message on the message channel, or // exit this function if Cancel are received via ctx. select { + case <-ticker.C: + // We only want to remove subprocesses + if p.isSubProcess { + p.ctxCancel() + + p.processes.active.mu.Lock() + delete(p.processes.active.procNames, p.processName) + p.processes.active.mu.Unlock() + } + case m := <-p.subject.messageCh: + ticker.Reset(time.Second * time.Duration(p.configuration.KeepPublishersAliveFor)) // Sign the methodArgs, and add the signature to the message. m.ArgSignature = p.addMethodArgSignature(m) // fmt.Printf(" * DEBUG: add signature, fromNode: %v, method: %v, len of signature: %v\n", m.FromNode, m.Method, len(m.ArgSignature)) @@ -821,6 +841,16 @@ func (p process) publishAMessage(m Message, zEnc *zstd.Encoder, once sync.Once, return } + // NB: vvvvvvvvvvvvvvvvvvvvv-THIS DOES NOT WORK FOR CANCELING THE PUBLISHER-vvvvvvvvv + //if m.IsSubPublishedMsg { + // p.ctxCancel() + // go func() { + // p.processes.active.mu.Lock() + // delete(p.processes.active.procNames, p.processName) + // p.processes.active.mu.Unlock() + // }() + //} + // Increment the counter for the next message to be sent. p.messageID++ diff --git a/requests_copy.go b/requests_copy.go index 1e150ad..102466c 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -183,7 +183,8 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ sub := newSubjectNoVerifyHandler(m, node) // Create a new sub process that will do the actual file copying. - copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) + + copySrcSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. @@ -233,6 +234,14 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ return ackMsg, nil } +// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true. +func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process { + p := newProcess(ctx, server, subject, processKind, procFunc) + p.isSubProcess = true + + return p +} + // ---- type methodREQCopyDst struct { @@ -275,7 +284,7 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ sub := newSubjectNoVerifyHandler(cia.DstMethod, node) // Create a new sub process that will do the actual file copying. - copyDstSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) + copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. @@ -423,11 +432,12 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel // We want to send a message back to src that we are ready to start. // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ - ToNode: cia.DstNode, - FromNode: cia.SrcNode, - Method: cia.DstMethod, - ReplyMethod: REQNone, - Data: csaSerialized, + ToNode: cia.DstNode, + FromNode: cia.SrcNode, + Method: cia.DstMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + IsSubPublishedMsg: true, } // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) @@ -479,11 +489,12 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel // We want to send a message back to src that we are ready to start. // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ - ToNode: cia.DstNode, - FromNode: cia.SrcNode, - Method: cia.DstMethod, - ReplyMethod: REQNone, - Data: csaSerialized, + ToNode: cia.DstNode, + FromNode: cia.SrcNode, + Method: cia.DstMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + IsSubPublishedMsg: true, } // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) @@ -531,11 +542,12 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc { fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ - ToNode: cia.SrcNode, - FromNode: cia.DstNode, - Method: cia.SrcMethod, - ReplyMethod: REQNone, - Data: csaSerialized, + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + IsSubPublishedMsg: true, } fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) @@ -607,11 +619,12 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ - ToNode: cia.SrcNode, - FromNode: cia.DstNode, - Method: cia.SrcMethod, - ReplyMethod: REQNone, - Data: csaSer, + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSer, + IsSubPublishedMsg: true, } // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) @@ -633,11 +646,12 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc } msg := Message{ - ToNode: cia.SrcNode, - FromNode: cia.DstNode, - Method: cia.SrcMethod, - ReplyMethod: REQNone, - Data: csaSer, + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSer, + IsSubPublishedMsg: true, } sam, err := newSubjectAndMessage(msg) @@ -766,11 +780,12 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc // We want to send a message back to src that we are ready to start. // fmt.Printf("\n\n\n ************** DEBUG: copyDstHandler sub process sending copyReady to:%v\n ", message.FromNode) msg := Message{ - ToNode: cia.SrcNode, - FromNode: cia.DstNode, - Method: cia.SrcMethod, - ReplyMethod: REQNone, - Data: csaSerialized, + ToNode: cia.SrcNode, + FromNode: cia.DstNode, + Method: cia.SrcMethod, + ReplyMethod: REQNone, + Data: csaSerialized, + IsSubPublishedMsg: true, } // fmt.Printf("\n ***** DEBUG: copyDstSubProcFunc: cia.SrcMethod: %v\n\n ", cia.SrcMethod) diff --git a/server.go b/server.go index 747544b..e4cfe38 100644 --- a/server.go +++ b/server.go @@ -483,7 +483,13 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - proc := newProcess(s.ctx, s, sub, processKindPublisher, nil) + var proc process + switch { + case m.IsSubPublishedMsg: + proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil) + default: + proc = newProcess(s.ctx, s, sub, processKindPublisher, nil) + } proc.spawnWorker() // log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)