1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

not terminating publisher services that should run at all times based on timer

This commit is contained in:
postmannen 2023-01-13 15:09:23 +01:00
parent b3a85d0283
commit 7d21af55e7
2 changed files with 15 additions and 0 deletions

View file

@ -35,6 +35,10 @@ const (
type process struct { type process struct {
// isSubProcess is used to indentify subprocesses spawned by other processes. // isSubProcess is used to indentify subprocesses spawned by other processes.
isSubProcess bool isSubProcess bool
// isLongRunningPublisher is set to true for a publisher service that should not
// be auto terminated like a normal autospawned publisher would be when the the
// inactivity timeout have expired
isLongRunningPublisher bool
// server // server
server *server server *server
// messageID // messageID
@ -848,6 +852,14 @@ func (p process) publishMessages(natsConn *nats.Conn) {
// exit this function if Cancel are received via ctx. // exit this function if Cancel are received via ctx.
select { select {
case <-ticker.C: case <-ticker.C:
if p.isLongRunningPublisher {
er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
p.errorKernel.logDebug(er, p.configuration)
continue
}
// We only want to remove subprocesses // We only want to remove subprocesses
// REMOVED 120123: Removed if so all publishers should be canceled if inactive. // REMOVED 120123: Removed if so all publishers should be canceled if inactive.
//if p.isSubProcess { //if p.isSubProcess {

View file

@ -290,6 +290,7 @@ func (s startup) pubREQHello(p process) {
sub := newSubject(REQHello, p.configuration.CentralNodeName) sub := newSubject(REQHello, p.configuration.CentralNodeName)
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
proc.isLongRunningPublisher = true
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
@ -342,6 +343,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) {
sub := newSubject(REQKeysRequestUpdate, p.configuration.CentralNodeName) sub := newSubject(REQKeysRequestUpdate, p.configuration.CentralNodeName)
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
proc.isLongRunningPublisher = true
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
@ -399,6 +401,7 @@ func (s startup) pubREQAclRequestUpdate(p process) {
sub := newSubject(REQAclRequestUpdate, p.configuration.CentralNodeName) sub := newSubject(REQAclRequestUpdate, p.configuration.CentralNodeName)
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
proc.isLongRunningPublisher = true
// Define the procFunc to be used for the process. // Define the procFunc to be used for the process.
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {