1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added streamInfo to process type, and also added it as input to new process.

This commit is contained in:
postmannen 2024-11-21 10:09:49 +01:00
parent 6b1c259373
commit 815f00141d
6 changed files with 33 additions and 18 deletions

View file

@ -149,3 +149,16 @@ type subjectName string
func (s Subject) name() subjectName {
return subjectName(fmt.Sprintf("%s.%s", s.ToNode, s.Method))
}
type streamInfo struct {
name string
subjects []string
}
func newStreamInfo(name string, subjects []string) streamInfo {
s := streamInfo{
name: name,
subjects: subjects,
}
return s
}

View file

@ -44,8 +44,10 @@ type process struct {
messageID int
// the subject used for the specific process. One process
// can contain only one sender on a message bus, hence
// also one subject
// also one subject.
subject Subject
// The jetstram stream.
streamInfo streamInfo
// Put a node here to be able know the node a process is at.
node Node
// The processID for the current process
@ -125,7 +127,7 @@ type process struct {
// prepareNewProcess will set the the provided values and the default
// values for a process.
func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
func newProcess(ctx context.Context, server *server, subject Subject, stream streamInfo, processKind processKind) process {
// create the initial configuration for a sessions communicating with 1 host process.
server.processes.mu.Lock()
server.processes.lastProcessID++
@ -163,24 +165,24 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
js: js,
}
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
// We use the name of the subject to identify a unique process.
if proc.processKind == processKindPublisherNats {
switch proc.processKind {
case processKindPublisherNats:
proc.processName = processNameGet(proc.subject.name(), processKindPublisherNats)
}
if proc.processKind == processKindSubscriberNats {
case processKindSubscriberNats:
proc.processName = processNameGet(proc.subject.name(), processKindSubscriberNats)
case processKindConsumerJetstream:
proc.processName = processNameGet(subjectName(proc.streamInfo.name), processKindConsumerJetstream)
case processKindPublisherJetstream:
proc.processName = processNameGet(subjectName(proc.streamInfo.name), processKindPublisherJetstream)
}
return proc
}
// The purpose of this function is to check if we should start a
// publisher or subscriber process, where a process is a go routine
// that will handle either sending or receiving messages on one
// subject.
// Start a publisher or subscriber process, where a process is a go routine
// that will handle either sending or receiving messages on one subject.
//
// It will give the process the next available ID, and also add the
// process to the processes map in the server structure.

View file

@ -382,7 +382,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
}
fmt.Printf("DEBUG:::startup subscriber, subject: %v\n", sub)
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriberNats)
proc := newProcess(p.ctx, p.processes.server, sub, streamInfo{}, processKindSubscriberNats)
proc.procFunc = pf
go proc.Start()
@ -394,7 +394,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
p.errorKernel.logDebug(er)
sub := newSubject(m, string(p.node))
proc := newProcess(p.ctx, p.processes.server, sub, processKindPublisherNats)
proc := newProcess(p.ctx, p.processes.server, sub, streamInfo{}, processKindPublisherNats)
proc.procFunc = pf
proc.isLongRunningPublisher = true

View file

@ -281,7 +281,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
// newSubProcess is a wrapper around newProcess which sets the isSubProcess value to true.
func newSubProcess(ctx context.Context, server *server, subject Subject, processKind processKind) process {
p := newProcess(ctx, server, subject, processKind)
p := newProcess(ctx, server, subject, streamInfo{}, processKind)
p.isSubProcess = true
return p

View file

@ -68,7 +68,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
// Create the process and start it.
sub := newSubject(method, proc.configuration.NodeName)
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriberNats)
procNew := newProcess(proc.ctx, proc.server, sub, streamInfo{}, processKindSubscriberNats)
go procNew.Start()
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)

View file

@ -337,7 +337,7 @@ func (s *server) Start() {
//
// The context of the initial process are set in processes.Start.
sub := newSubject(Initial, s.nodeName)
s.processInitial = newProcess(context.TODO(), s, sub, "")
s.processInitial = newProcess(context.TODO(), s, sub, streamInfo{}, "")
// Start all wanted subscriber processes.
s.processes.Start(s.processInitial)
@ -574,7 +574,7 @@ func (s *server) routeMessagesToProcess() {
case m.IsSubPublishedMsg:
proc = newSubProcess(s.ctx, s, sub, processKindPublisherNats)
default:
proc = newProcess(s.ctx, s, sub, processKindPublisherNats)
proc = newProcess(s.ctx, s, sub, streamInfo{}, processKindPublisherNats)
}
proc.Start()