1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

moved naming of process into newProcess

This commit is contained in:
postmannen 2022-06-09 10:18:09 +02:00
parent 731f628233
commit 680cb1f4dd
2 changed files with 25 additions and 23 deletions

View file

@ -140,6 +140,17 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
metrics: server.metrics, metrics: server.metrics,
} }
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
if proc.processKind == processKindPublisher {
proc.processName = processNameGet(proc.subject.name(), processKindPublisher)
}
if proc.processKind == processKindSubscriber {
proc.processName = processNameGet(proc.subject.name(), processKindSubscriber)
}
return proc return proc
} }
@ -151,21 +162,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
// It will give the process the next available ID, and also add the // It will give the process the next available ID, and also add the
// process to the processes map in the server structure. // process to the processes map in the server structure.
func (p process) spawnWorker() { func (p process) spawnWorker() {
// We use the full name of the subject to identify a unique
// process. We can do that since a process can only handle
// one message queue.
var pn processName
if p.processKind == processKindPublisher {
pn = processNameGet(p.subject.name(), processKindPublisher)
}
if p.processKind == processKindSubscriber {
pn = processNameGet(p.subject.name(), processKindSubscriber)
}
processName := processNameGet(p.subject.name(), p.processKind) // processName := processNameGet(p.subject.name(), p.processKind)
// Add prometheus metrics for the process. // Add prometheus metrics for the process.
p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)})
// Start a publisher worker, which will start a go routine (process) // Start a publisher worker, which will start a go routine (process)
// That will take care of all the messages for the subject it owns. // That will take care of all the messages for the subject it owns.
@ -214,12 +215,12 @@ func (p process) spawnWorker() {
p.natsSubscription = p.subscribeMessages() p.natsSubscription = p.subscribeMessages()
} }
p.processName = pn
// Add information about the new process to the started processes map. // Add information about the new process to the started processes map.
p.processes.active.mu.Lock() p.processes.active.mu.Lock()
p.processes.active.procNames[pn] = p p.processes.active.procNames[p.processName] = p
p.processes.active.mu.Unlock() p.processes.active.mu.Unlock()
log.Printf("Successfully started process: %v\n", p.processName)
} }
// messageDeliverNats will create the Nats message with headers and payload. // messageDeliverNats will create the Nats message with headers and payload.

View file

@ -77,9 +77,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
return return
} }
SrcFilePath := message.MethodArgs[0] // SrcFilePath := message.MethodArgs[0]
DstNode := message.MethodArgs[1] // DstNode := message.MethodArgs[1]
DstFilePath := message.MethodArgs[2] // DstFilePath := message.MethodArgs[2]
// Get a context with the timeout specified in message.MethodTimeout. // Get a context with the timeout specified in message.MethodTimeout.
// Since the subProc spawned will outlive this method here we do not // Since the subProc spawned will outlive this method here we do not
@ -98,12 +98,13 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil)
// Give the sub process a procFunc so we do the actual copying within a procFunc, // Give the sub process a procFunc so we do the actual copying within a procFunc,
// and not directly within the handler. // and not directly within the handler.
copySrcSubProc.procFunc = copySrcProcFunc() copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc)
// The process will be killed when the context expires. // The process will be killed when the context expires.
go copySrcSubProc.spawnWorker() go copySrcSubProc.spawnWorker()
replyData := fmt.Sprintf("info: succesfully initiated copy: srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", node, SrcFilePath, DstNode, DstFilePath, subProcessName) //replyData := fmt.Sprintf("info: succesfully initiated copy: srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", node, SrcFilePath, DstNode, DstFilePath, subProcessName)
newReplyMessage(proc, message, []byte(replyData)) //
//newReplyMessage(proc, message, []byte(replyData))
}() }()
@ -111,12 +112,12 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([
return ackMsg, nil return ackMsg, nil
} }
func copySrcProcFunc() func(context.Context, chan Message) error { func copySrcProcFunc(proc process) func(context.Context, chan Message) error {
pf := func(ctx context.Context, procFuncCh chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
log.Printf(" * copySrcProcFunc ended\n") log.Printf(" * copySrcProcFunc ended: %v\n", proc.processName)
} }
return nil return nil