diff --git a/process.go b/process.go index d34954b..0442cd0 100644 --- a/process.go +++ b/process.go @@ -227,7 +227,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) { // Create a subscriber for the reply message. subReply, err := natsConn.SubscribeSync(msg.Reply) if err != nil { - er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err) + er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err) // sendErrorLogMessage(p.toRingbufferCh, node(p.node), er) log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer) time.Sleep(time.Second * subscribeSyncTimer) diff --git a/requests.go b/requests.go index eb68f03..335d18a 100644 --- a/requests.go +++ b/requests.go @@ -270,16 +270,22 @@ func newReplyMessage(proc process, message Message, outData []byte) { message.ReplyMethod = REQToFileAppend } + fmt.Printf("\n * DEBUG * newReplyMessage: message.FromNode contains: %v\n", message.FromNode) + fmt.Printf("\n * DEBUG * newReplyMessage: message.ToNode contains: %v\n", message.ToNode) + // Create a new message for the reply, and put it on the // ringbuffer to be published. newMsg := Message{ ToNode: message.FromNode, + FromNode: message.ToNode, Data: []string{string(outData)}, Method: message.ReplyMethod, MethodArgs: message.ReplyMethodArgs, MethodTimeout: message.ReplyMethodTimeout, ACKTimeout: message.ReplyACKTimeout, Retries: message.ReplyRetries, + Directory: message.Directory, + FileName: message.FileName, // Put in a copy of the initial request message, so we can use it's properties if // needed to for example create the file structure naming on the subscriber. @@ -1068,20 +1074,24 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) go func() { defer proc.processes.wg.Done() - // Check if {{data}} is defined in the arguments. If found put the + // Check if {{data}} is defined in the method arguments. If found put the // data payload there. + var foundEnvData bool + var envData string for i, v := range message.MethodArgs { - if strings.Contains(v, "{{data}}") { - var s string - for _, vv := range message.Data { - s = s + vv - } - - message.MethodArgs[i] = s + if strings.Contains(v, "{{STEWARD_DATA}}") { + foundEnvData = true + // Replace the found env variable placeholder with an actual env variable + message.MethodArgs[i] = strings.Replace(message.MethodArgs[i], "{{STEWARD_DATA}}", "$STEWARD_DATA", -1) + // Put all the data which is a slice of string into a single + // string so we can put it in a single env variable. + envData = strings.Join(message.Data, "") } } + fmt.Printf("* DEBUG * handler: received message contains : %#v\n", message) + c := message.MethodArgs[0] a := message.MethodArgs[1:] @@ -1095,6 +1105,13 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) cmd := exec.CommandContext(ctx, c, a...) + // Check for the use of env variable for STEWARD_DATA, and set env if found. + if foundEnvData { + envData = fmt.Sprintf("STEWARD_DATA=%v", envData) + cmd.Env = append(cmd.Env, envData) + fmt.Printf("\n * DEBUG * cmd.Env contains: %v\n\n", cmd.Env) + } + var out bytes.Buffer var stderr bytes.Buffer cmd.Stdout = &out @@ -1102,7 +1119,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) err := cmd.Run() if err != nil { if err != nil { - log.Printf("error: failed to io.ReadAll of stderr: %v\n", err) + log.Printf("error: failed cmd.Run: %v\n", err) } er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v, error_output: %v", err, message, stderr.String())