diff --git a/process.go b/process.go index bbfd4d3..6e6eef0 100644 --- a/process.go +++ b/process.go @@ -35,6 +35,10 @@ const ( type process struct { // isSubProcess is used to indentify subprocesses spawned by other processes. isSubProcess bool + // isLongRunningPublisher is set to true for a publisher service that should not + // be auto terminated like a normal autospawned publisher would be when the the + // inactivity timeout have expired + isLongRunningPublisher bool // server server *server // messageID @@ -848,6 +852,14 @@ func (p process) publishMessages(natsConn *nats.Conn) { // exit this function if Cancel are received via ctx. select { case <-ticker.C: + if p.isLongRunningPublisher { + er := fmt.Errorf("info: isLongRunningPublisher, will not cancel publisher: %v", p.processName) + //sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er) + p.errorKernel.logDebug(er, p.configuration) + + continue + } + // We only want to remove subprocesses // REMOVED 120123: Removed if so all publishers should be canceled if inactive. //if p.isSubProcess { diff --git a/processes.go b/processes.go index f43d69b..62d45fa 100644 --- a/processes.go +++ b/processes.go @@ -290,6 +290,7 @@ func (s startup) pubREQHello(p process) { sub := newSubject(REQHello, p.configuration.CentralNodeName) proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) + proc.isLongRunningPublisher = true // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { @@ -342,6 +343,7 @@ func (s startup) pubREQKeysRequestUpdate(p process) { sub := newSubject(REQKeysRequestUpdate, p.configuration.CentralNodeName) proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) + proc.isLongRunningPublisher = true // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { @@ -399,6 +401,7 @@ func (s startup) pubREQAclRequestUpdate(p process) { sub := newSubject(REQAclRequestUpdate, p.configuration.CentralNodeName) proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) + proc.isLongRunningPublisher = true // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {