From 680cb1f4ddd6d441f66aa4630a1ce3d7ee0ad798 Mon Sep 17 00:00:00 2001 From: postmannen Date: Thu, 9 Jun 2022 10:18:09 +0200 Subject: [PATCH] moved naming of process into newProcess --- process.go | 31 ++++++++++++++++--------------- requests_copy.go | 17 +++++++++-------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/process.go b/process.go index 008f090..1bf8c99 100644 --- a/process.go +++ b/process.go @@ -140,6 +140,17 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin metrics: server.metrics, } + // We use the full name of the subject to identify a unique + // process. We can do that since a process can only handle + // one message queue. + + if proc.processKind == processKindPublisher { + proc.processName = processNameGet(proc.subject.name(), processKindPublisher) + } + if proc.processKind == processKindSubscriber { + proc.processName = processNameGet(proc.subject.name(), processKindSubscriber) + } + return proc } @@ -151,21 +162,11 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin // It will give the process the next available ID, and also add the // process to the processes map in the server structure. func (p process) spawnWorker() { - // We use the full name of the subject to identify a unique - // process. We can do that since a process can only handle - // one message queue. - var pn processName - if p.processKind == processKindPublisher { - pn = processNameGet(p.subject.name(), processKindPublisher) - } - if p.processKind == processKindSubscriber { - pn = processNameGet(p.subject.name(), processKindSubscriber) - } - processName := processNameGet(p.subject.name(), p.processKind) + // processName := processNameGet(p.subject.name(), p.processKind) // Add prometheus metrics for the process. - p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) + p.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(p.processName)}) // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. @@ -214,12 +215,12 @@ func (p process) spawnWorker() { p.natsSubscription = p.subscribeMessages() } - p.processName = pn - // Add information about the new process to the started processes map. p.processes.active.mu.Lock() - p.processes.active.procNames[pn] = p + p.processes.active.procNames[p.processName] = p p.processes.active.mu.Unlock() + + log.Printf("Successfully started process: %v\n", p.processName) } // messageDeliverNats will create the Nats message with headers and payload. diff --git a/requests_copy.go b/requests_copy.go index 3a53201..77374d1 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -77,9 +77,9 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ return } - SrcFilePath := message.MethodArgs[0] - DstNode := message.MethodArgs[1] - DstFilePath := message.MethodArgs[2] + // SrcFilePath := message.MethodArgs[0] + // DstNode := message.MethodArgs[1] + // DstFilePath := message.MethodArgs[2] // Get a context with the timeout specified in message.MethodTimeout. // Since the subProc spawned will outlive this method here we do not @@ -98,12 +98,13 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ copySrcSubProc := newProcess(ctx, proc.server, sub, processKindSubscriber, nil) // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. - copySrcSubProc.procFunc = copySrcProcFunc() + copySrcSubProc.procFunc = copySrcProcFunc(copySrcSubProc) // The process will be killed when the context expires. go copySrcSubProc.spawnWorker() - replyData := fmt.Sprintf("info: succesfully initiated copy: srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", node, SrcFilePath, DstNode, DstFilePath, subProcessName) - newReplyMessage(proc, message, []byte(replyData)) + //replyData := fmt.Sprintf("info: succesfully initiated copy: srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying\n", node, SrcFilePath, DstNode, DstFilePath, subProcessName) + // + //newReplyMessage(proc, message, []byte(replyData)) }() @@ -111,12 +112,12 @@ func (m methodREQCopySrc) handler(proc process, message Message, node string) ([ return ackMsg, nil } -func copySrcProcFunc() func(context.Context, chan Message) error { +func copySrcProcFunc(proc process) func(context.Context, chan Message) error { pf := func(ctx context.Context, procFuncCh chan Message) error { select { case <-ctx.Done(): - log.Printf(" * copySrcProcFunc ended\n") + log.Printf(" * copySrcProcFunc ended: %v\n", proc.processName) } return nil