diff --git a/process.go b/process.go index af000c7..cf51f18 100644 --- a/process.go +++ b/process.go @@ -124,7 +124,10 @@ type process struct { // values for a process. func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. + server.processes.mu.Lock() server.processes.lastProcessID++ + pid := server.processes.lastProcessID + server.processes.mu.Unlock() ctx, cancel := context.WithCancel(ctx) @@ -135,7 +138,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin messageID: 0, subject: subject, node: Node(server.configuration.NodeName), - processID: server.processes.lastProcessID, + processID: pid, processKind: processKind, methodsAvailable: method.GetMethodsAvailable(), toRingbufferCh: server.toRingBufferCh, diff --git a/processes.go b/processes.go index ffad931..c901936 100644 --- a/processes.go +++ b/processes.go @@ -12,6 +12,8 @@ import ( // processes holds all the information about running processes type processes struct { + // mutex for processes + mu sync.Mutex // The main context for subscriber processes. ctx context.Context // cancel func to send cancel signal to the subscriber processes context. diff --git a/requests_copy.go b/requests_copy.go index 0400e9e..657410e 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -416,7 +416,9 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel status = copySrcDone } - lastReadChunk = b[:n] + lastReadChunk = make([]byte, len(b[:n])) + copy(lastReadChunk, b[:n]) + //lastReadChunk = b[:n] // Create a hash of the bytes. hash := sha256.Sum256(b[:n])