1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-01-18 21:59:30 +00:00

Implemented use of context for all cli commands

This commit is contained in:
postmannen 2021-03-26 16:47:37 +01:00
parent 3fdf7e40b9
commit 03da2e18e7
3 changed files with 67 additions and 34 deletions

View file

@ -2,9 +2,10 @@
{ {
"toNode": "ship1", "toNode": "ship1",
"data": ["bash","-c","netstat -an|grep -i listen"], "data": ["bash","-c","sleep 5 && netstat -an|grep -i listen"],
"method":"CLICommand", "method":"CLICommand",
"timeout":3, "timeout":3,
"retries":3 "retries":3,
"MethodTimeout": 7
} }
] ]

View file

@ -6,6 +6,6 @@
"method":"CLICommandRequest", "method":"CLICommandRequest",
"timeout":10, "timeout":10,
"retries":3, "retries":3,
"MethodTimeout": 7 "MethodTimeout": 3
} }
] ]

View file

@ -198,13 +198,32 @@ func (m methodCLICommand) getKind() CommandOrEvent {
} }
func (m methodCLICommand) handler(proc process, message Message, node string) ([]byte, error) { func (m methodCLICommand) handler(proc process, message Message, node string) ([]byte, error) {
out := []byte{}
c := message.Data[0] c := message.Data[0]
a := message.Data[1:] a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
out, err := cmd.CombinedOutput() defer cancel()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err) outCh := make(chan []byte)
go func() {
cmd := exec.CommandContext(ctx, c, a...)
out, err := cmd.Output()
if err != nil {
log.Printf("error: %v\n", err)
}
outCh <- out
}()
select {
case <-ctx.Done():
fmt.Printf(" ** Before\n")
er := fmt.Errorf("error: method timed out %v", proc)
sendErrorLogMessage(proc.newMessagesCh, proc.node, er)
fmt.Printf(" ** After\n")
case out = <-outCh:
} }
ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out))
@ -349,10 +368,9 @@ func (m methodCLICommandRequest) getKind() CommandOrEvent {
func (m methodCLICommandRequest) handler(proc process, message Message, node string) ([]byte, error) { func (m methodCLICommandRequest) handler(proc process, message Message, node string) ([]byte, error) {
log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
c := message.Data[0]
a := message.Data[1:]
go func() { go func() {
c := message.Data[0]
a := message.Data[1:]
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
defer cancel() defer cancel()
@ -418,35 +436,49 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod
log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data) log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data)
go func() { go func() {
c := message.Data[0] c := message.Data[0]
a := message.Data[1:] a := message.Data[1:]
cmd := exec.Command(c, a...)
//cmd.Stdout = os.Stdout
out, err := cmd.CombinedOutput()
if err != nil {
log.Printf("error: execution of command failed: %v\n", err)
}
time.Sleep(time.Second * time.Duration(message.MethodTimeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout))
defer cancel()
// Create a new message for the reply, and put it on the outCh := make(chan []byte)
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg) go func() {
if err != nil { cmd := exec.CommandContext(ctx, c, a...)
// In theory the system should drop the message before it reaches here. out, err := cmd.Output()
log.Printf("error: methodCLICommandRequestNOSEQ: %v\n", err) if err != nil {
log.Printf("error: %v\n", err)
}
outCh <- out
}()
select {
case <-ctx.Done():
fmt.Printf(" ** Before\n")
er := fmt.Errorf("error: method timed out %v", proc)
sendErrorLogMessage(proc.newMessagesCh, proc.node, er)
fmt.Printf(" ** After\n")
case out := <-outCh:
// Create a new message for the reply, and put it on the
// ringbuffer to be published.
newMsg := Message{
ToNode: message.FromNode,
Data: []string{string(out)},
Method: CLICommandReply,
Timeout: 3,
Retries: 3,
}
fmt.Printf("** %#v\n", newMsg)
nSAM, err := newSAM(newMsg)
if err != nil {
// In theory the system should drop the message before it reaches here.
log.Printf("error: methodCLICommandRequest: %v\n", err)
}
proc.newMessagesCh <- []subjectAndMessage{nSAM}
} }
proc.newMessagesCh <- []subjectAndMessage{nSAM}
}() }()