mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-15 10:57:42 +00:00
renamed process spawnworker() to start()
This commit is contained in:
parent
84a731f18a
commit
97d5da948f
5 changed files with 8 additions and 9 deletions
|
@ -153,8 +153,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
||||||
|
|
||||||
// We use the full name of the subject to identify a unique
|
// We use the full name of the subject to identify a unique
|
||||||
// process. We can do that since a process can only handle
|
// process. We can do that since a process can only handle
|
||||||
// one message queue.
|
// one request type.
|
||||||
|
|
||||||
if proc.processKind == processKindPublisher {
|
if proc.processKind == processKindPublisher {
|
||||||
proc.processName = processNameGet(proc.subject.name(), processKindPublisher)
|
proc.processName = processNameGet(proc.subject.name(), processKindPublisher)
|
||||||
}
|
}
|
||||||
|
@ -172,7 +171,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
||||||
//
|
//
|
||||||
// It will give the process the next available ID, and also add the
|
// It will give the process the next available ID, and also add the
|
||||||
// process to the processes map in the server structure.
|
// process to the processes map in the server structure.
|
||||||
func (p process) spawnWorker() {
|
func (p process) start() {
|
||||||
|
|
||||||
// Add prometheus metrics for the process.
|
// Add prometheus metrics for the process.
|
||||||
if !p.isSubProcess {
|
if !p.isSubProcess {
|
||||||
|
|
|
@ -385,7 +385,7 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
|
||||||
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
|
proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber)
|
||||||
proc.procFunc = pf
|
proc.procFunc = pf
|
||||||
|
|
||||||
go proc.spawnWorker()
|
go proc.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// publisher will start a publisher process. It takes the initial process, request method,
|
// publisher will start a publisher process. It takes the initial process, request method,
|
||||||
|
@ -398,7 +398,7 @@ func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, pr
|
||||||
proc.procFunc = pf
|
proc.procFunc = pf
|
||||||
proc.isLongRunningPublisher = true
|
proc.isLongRunningPublisher = true
|
||||||
|
|
||||||
go proc.spawnWorker()
|
go proc.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------------------------
|
// ---------------------------------------------------------------
|
||||||
|
|
|
@ -235,7 +235,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
||||||
copySrcSubProc.handler = copySrcSubHandler()
|
copySrcSubProc.handler = copySrcSubHandler()
|
||||||
|
|
||||||
// The process will be killed when the context expires.
|
// The process will be killed when the context expires.
|
||||||
go copySrcSubProc.spawnWorker()
|
go copySrcSubProc.start()
|
||||||
|
|
||||||
// Send a message over the the node where the destination file will be written,
|
// Send a message over the the node where the destination file will be written,
|
||||||
// to also start up a sub process on the destination node.
|
// to also start up a sub process on the destination node.
|
||||||
|
@ -362,7 +362,7 @@ func methodCopyDst(proc process, message Message, node string) ([]byte, error) {
|
||||||
copyDstSubProc.handler = copyDstSubHandler()
|
copyDstSubProc.handler = copyDstSubHandler()
|
||||||
|
|
||||||
// The process will be killed when the context expires.
|
// The process will be killed when the context expires.
|
||||||
go copyDstSubProc.spawnWorker()
|
go copyDstSubProc.start()
|
||||||
|
|
||||||
fp := filepath.Join(cia.DstDir, cia.DstFile)
|
fp := filepath.Join(cia.DstDir, cia.DstFile)
|
||||||
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName)
|
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copyDstSubProc.processName, node, fp, subProcessName)
|
||||||
|
|
|
@ -69,7 +69,7 @@ func methodOpProcessStart(proc process, message Message, node string) ([]byte, e
|
||||||
// Create the process and start it.
|
// Create the process and start it.
|
||||||
sub := newSubject(method, proc.configuration.NodeName)
|
sub := newSubject(method, proc.configuration.NodeName)
|
||||||
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
|
procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber)
|
||||||
go procNew.spawnWorker()
|
go procNew.start()
|
||||||
|
|
||||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||||
er := fmt.Errorf("%v", txt)
|
er := fmt.Errorf("%v", txt)
|
||||||
|
|
|
@ -600,7 +600,7 @@ func (s *server) routeMessagesToProcess() {
|
||||||
proc = newProcess(s.ctx, s, sub, processKindPublisher)
|
proc = newProcess(s.ctx, s, sub, processKindPublisher)
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.spawnWorker()
|
proc.start()
|
||||||
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
er = fmt.Errorf("info: processNewMessages: new process started, subject: %v, processID: %v", subjName, proc.processID)
|
||||||
s.errorKernel.logDebug(er)
|
s.errorKernel.logDebug(er)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue