diff --git a/requests.go b/requests.go index 0e9fb6a..117e1e7 100644 --- a/requests.go +++ b/requests.go @@ -1618,19 +1618,76 @@ func (m methodREQRelayInitial) getKind() CommandOrEvent { // Handler to relay messages via a host. func (m methodREQRelayInitial) handler(proc process, message Message, node string) ([]byte, error) { - // relay the message to the actual host here. - message.ToNode = message.RelayOriginalViaNode - message.FromNode = Node(node) - message.Method = REQRelay + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() - sam, err := newSubjectAndMessage(message) - if err != nil { - er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) - sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) - log.Printf("%v\n", er) - } + ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + defer cancel() - proc.toRingbufferCh <- []subjectAndMessage{sam} + outCh := make(chan []byte) + errCh := make(chan error) + + var out []byte + + // If the actual Method for the message is REQCopyFileFrom we need to + // do the actual file reading here so we can fill the data field of the + // message with the content of the file before relaying it. + switch { + case message.RelayOriginalMethod == REQCopyFileFrom: + switch { + case len(message.MethodArgs) < 3: + er := fmt.Errorf("error: methodREQCliCommand: got <3 number methodArgs: want srcfilePath,dstNode,dstFilePath") + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + return + } + + SrcFilePath := message.MethodArgs[0] + + // Read the file, and put the result on the out channel to be sent when done reading. + proc.processes.wg.Add(1) + go copyFileFrom(ctx, &proc.processes.wg, SrcFilePath, errCh, outCh) + + // Since we now have read the source file we don't need the REQCopyFileFrom + // request method anymore, so we change the original method of the message + // so it will write the data after the relaying. + message.RelayOriginalMethod = REQCopyFileTo + default: + // No request type that need special handling if relayed. + } + + select { + case <-ctx.Done(): + er := fmt.Errorf("error: methodREQRelayInitial: CopyFromFile: got <-ctx.Done(): %v", message.MethodArgs) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + + return + case er := <-errCh: + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + + return + case out = <-outCh: + + } + + // ------- old part of code + + // relay the message to the actual host here. + message.ToNode = message.RelayOriginalViaNode + message.FromNode = Node(node) + message.Method = REQRelay + message.Data = []string{string(out)} + + sam, err := newSubjectAndMessage(message) + if err != nil { + er := fmt.Errorf("error: newSubjectAndMessage : %v, message: %v", err, message) + sendErrorLogMessage(proc.configuration, proc.processes.metrics, proc.toRingbufferCh, proc.node, er) + log.Printf("%v\n", er) + } + + proc.toRingbufferCh <- []subjectAndMessage{sam} + }() // Send back an ACK message. ackMsg := []byte("confirmed REQRelay from: " + node + ": " + fmt.Sprint(message.ID))