1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

initial implementation clicommand as reply method

This commit is contained in:
postmannen 2021-09-22 15:25:40 +02:00
parent 21f532c128
commit 8d0b4420c9
2 changed files with 27 additions and 10 deletions

View file

@ -227,7 +227,7 @@ func (p process) messageDeliverNats(natsConn *nats.Conn, message Message) {
// Create a subscriber for the reply message.
subReply, err := natsConn.SubscribeSync(msg.Reply)
if err != nil {
er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message: %v", err)
er := fmt.Errorf("error: nc.SubscribeSync failed: failed to create reply message for subject: %v, error: %v", msg.Reply, err)
// sendErrorLogMessage(p.toRingbufferCh, node(p.node), er)
log.Printf("%v, waiting %ds before retrying\n", er, subscribeSyncTimer)
time.Sleep(time.Second * subscribeSyncTimer)

View file

@ -270,16 +270,22 @@ func newReplyMessage(proc process, message Message, outData []byte) {
message.ReplyMethod = REQToFileAppend
}
fmt.Printf("\n * DEBUG * newReplyMessage: message.FromNode contains: %v\n", message.FromNode)
fmt.Printf("\n * DEBUG * newReplyMessage: message.ToNode contains: %v\n", message.ToNode)
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
FromNode: message.ToNode,
Data: []string{string(outData)},
Method: message.ReplyMethod,
MethodArgs: message.ReplyMethodArgs,
MethodTimeout: message.ReplyMethodTimeout,
ACKTimeout: message.ReplyACKTimeout,
Retries: message.ReplyRetries,
Directory: message.Directory,
FileName: message.FileName,
// Put in a copy of the initial request message, so we can use it's properties if
// needed to for example create the file structure naming on the subscriber.
@ -1068,20 +1074,24 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
go func() {
defer proc.processes.wg.Done()
// Check if {{data}} is defined in the arguments. If found put the
// Check if {{data}} is defined in the method arguments. If found put the
// data payload there.
var foundEnvData bool
var envData string
for i, v := range message.MethodArgs {
if strings.Contains(v, "{{data}}") {
var s string
for _, vv := range message.Data {
s = s + vv
}
message.MethodArgs[i] = s
if strings.Contains(v, "{{STEWARD_DATA}}") {
foundEnvData = true
// Replace the found env variable placeholder with an actual env variable
message.MethodArgs[i] = strings.Replace(message.MethodArgs[i], "{{STEWARD_DATA}}", "$STEWARD_DATA", -1)
// Put all the data which is a slice of string into a single
// string so we can put it in a single env variable.
envData = strings.Join(message.Data, "")
}
}
fmt.Printf("* DEBUG * handler: received message contains : %#v\n", message)
c := message.MethodArgs[0]
a := message.MethodArgs[1:]
@ -1095,6 +1105,13 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
cmd := exec.CommandContext(ctx, c, a...)
// Check for the use of env variable for STEWARD_DATA, and set env if found.
if foundEnvData {
envData = fmt.Sprintf("STEWARD_DATA=%v", envData)
cmd.Env = append(cmd.Env, envData)
fmt.Printf("\n * DEBUG * cmd.Env contains: %v\n\n", cmd.Env)
}
var out bytes.Buffer
var stderr bytes.Buffer
cmd.Stdout = &out
@ -1102,7 +1119,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
err := cmd.Run()
if err != nil {
if err != nil {
log.Printf("error: failed to io.ReadAll of stderr: %v\n", err)
log.Printf("error: failed cmd.Run: %v\n", err)
}
er := fmt.Errorf("error: methodREQCliCommand: cmd.Output : %v, message: %v, error_output: %v", err, message, stderr.String())