1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

restructured sync when publishing messages

This commit is contained in:
postmannen 2023-01-03 11:41:18 +01:00
parent d58095764b
commit 2eff3c9cd3
2 changed files with 82 additions and 38 deletions

View file

@ -835,11 +835,16 @@ func (p process) publishMessages(natsConn *nats.Conn) {
case <-ticker.C:
// We only want to remove subprocesses
if p.isSubProcess {
p.ctxCancel()
p.processes.active.mu.Lock()
p.ctxCancel()
delete(p.processes.active.procNames, p.processName)
p.processes.active.mu.Unlock()
er := fmt.Errorf("info: canceled publisher: %v", p.processName)
//sendErrorLogMessage(p.toRingbufferCh, Node(p.node), er)
log.Printf("%v\n", er)
return
}
case m := <-p.subject.messageCh:

111
server.go
View file

@ -461,53 +461,92 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
return
}
for {
// Looping here so we are able to redo the sending
// of the last message if a process for the specified subject
// is not present. The process will then be created, and
// the code will loop back here.
// Looping here so we are able to redo the sending
// of the last message if a process for the specified subject
// is not present. The process will then be created, and
// the code will loop back here.
m := sam.Message
m := sam.Message
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
subjName := sam.Subject.name()
pn := processNameGet(subjName, processKindPublisher)
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
sendOK := func() bool {
var ctxCanceled bool
// Check if there is a map of type map[int]process registered
// for the processName, and if it exists then return it.
s.processes.active.mu.Lock()
defer s.processes.active.mu.Unlock()
// Check if the process exist, if it do not exist return false so a
// new publisher process will be created.
proc, ok := s.processes.active.procNames[pn]
s.processes.active.mu.Unlock()
if !ok {
return false
}
// If found a map above, range it, and are there already a process
// for that subject, put the message on that processes incomming
// message channel.
if ok {
// We have found the process to route the message to, deliver it.
proc.subject.messageCh <- m
if proc.ctx.Err() != nil {
ctxCanceled = true
}
if ok && ctxCanceled {
er := fmt.Errorf(" ** routeMessagesToProcess: context is already ended for process %v, will not try to reuse existing publisher, deleting it, and creating a new publisher !!! ", proc.processName)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
delete(proc.processes.active.procNames, proc.processName)
return false
}
break
} else {
// If a publisher process do not exist for the given subject, create it.
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
// If found in map above, and go routine for publishing is running,
// put the message on that processes incomming message channel.
if ok && !ctxCanceled {
select {
case proc.subject.messageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to existing process: %v", m.ID, proc.processName)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
}
proc.spawnWorker()
er := fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
// Now when the process is spawned we continue,
// and send the message to that new process.
continue
return true
}
// The process was not found, so we return false here so a new publisher
// process will be created later.
return false
}()
if sendOK {
return
}
er := fmt.Errorf("info: processNewMessages: did not find publisher process for subject %v, starting new", subjName)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
var proc process
switch {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisher, nil)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisher, nil)
}
proc.spawnWorker()
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
// Now when the process is spawned we continue,
// and send the message to that new process.
select {
case proc.subject.messageCh <- m:
er := fmt.Errorf(" ** routeMessagesToProcess: passed message: %v to the new process: %v", m.ID, proc.processName)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
case <-proc.ctx.Done():
er := fmt.Errorf(" ** routeMessagesToProcess: got ctx.done for process %v", proc.processName)
s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration)
}
}(samDBVal)
}
}()