From 8d017c072df18730f30ceb2c3832f58e3c3ff6ab Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 19 Dec 2022 07:21:53 +0100 Subject: [PATCH] discard copy messages when more procs are started for the same copySrc request --- requests_copy.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/requests_copy.go b/requests_copy.go index 28378cc..4548ac5 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -326,6 +326,28 @@ func (m methodREQCopyDst) handler(proc process, message Message, node string) ([ // Create a new sub process that will do the actual file copying. copyDstSubProc := newSubProcess(ctx, proc.server, sub, processKindSubscriber, nil) + // Check if we already got a sub process registered and started with + // the processName. If true, return here and don't start up another + // process for that file. + // + // NB: This check is put in here if a message for some reason are + // received more than once. The reason that this might happen can be + // that a message for the same copy request was received earlier, but + // was unable to start up within the timeout specified. The Sender of + // that request will then resend the message, but at the time that + // second message is received the subscriber process started for the + // previous message is then fully up and running, so we just discard + // that second message in those cases. + + proc.processes.active.mu.Lock() + _, ok := proc.processes.active.procNames[copyDstSubProc.processName] + proc.processes.active.mu.Unlock() + + if ok { + log.Printf(" * * * DEBUG: subprocesses already existed, will not start another subscriber for %v\n", copyDstSubProc.processName) + return + } + // Give the sub process a procFunc so we do the actual copying within a procFunc, // and not directly within the handler. copyDstSubProc.procFunc = copyDstSubProcFunc(copyDstSubProc, cia, message, cancel)