From 0d822b075168c761ffd5b2afee9082924899f5e2 Mon Sep 17 00:00:00 2001 From: postmannen Date: Mon, 29 Mar 2021 06:53:34 +0200 Subject: [PATCH] moved canceling of context, and comments --- example/toShip1-CLICommandRequest1.json | 4 +-- subscriber_method_types.go | 40 +++++++++++++++++-------- 2 files changed, 29 insertions(+), 15 deletions(-) diff --git a/example/toShip1-CLICommandRequest1.json b/example/toShip1-CLICommandRequest1.json index e87eb3f..e8a67d5 100644 --- a/example/toShip1-CLICommandRequest1.json +++ b/example/toShip1-CLICommandRequest1.json @@ -2,10 +2,10 @@ { "toNode": "ship1", - "data": ["bash","-c","sleep 5 & echo 'apekatt'"], + "data": ["bash","-c","sleep 3 & echo 'apekatt'"], "method":"CLICommandRequest", "timeout":10, "retries":3, - "MethodTimeout": 3 + "MethodTimeout": 7 } ] \ No newline at end of file diff --git a/subscriber_method_types.go b/subscriber_method_types.go index 3b2bc4a..fd683e1 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -197,6 +197,9 @@ func (m methodCLICommand) getKind() CommandOrEvent { return m.commandOrEvent } +// handler to run a CLI command with timeout context. The handler will +// return the output of the command run back to the calling publisher +// in the ack message. func (m methodCLICommand) handler(proc process, message Message, node string) ([]byte, error) { out := []byte{} @@ -204,7 +207,6 @@ func (m methodCLICommand) handler(proc process, message Message, node string) ([ a := message.Data[1:] ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) - defer cancel() outCh := make(chan []byte) @@ -219,11 +221,11 @@ func (m methodCLICommand) handler(proc process, message Message, node string) ([ select { case <-ctx.Done(): - fmt.Printf(" ** Before\n") - er := fmt.Errorf("error: method timed out %v", proc) + cancel() + er := fmt.Errorf("error: method timed out %v", message) sendErrorLogMessage(proc.newMessagesCh, proc.node, er) - fmt.Printf(" ** After\n") case out = <-outCh: + cancel() } ackMsg := []byte(fmt.Sprintf("confirmed from node: %v: messageID: %v\n---\n%s---", node, message.ID, out)) @@ -365,15 +367,21 @@ func (m methodCLICommandRequest) getKind() CommandOrEvent { return m.commandOrEvent } +// handler to run a CLI command with timeout context. The handler will +// return the output of the command run back to the calling publisher +// as a new message. func (m methodCLICommandRequest) handler(proc process, message Message, node string) ([]byte, error) { - log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + log.Printf("<--- CLICommandREQUEST received from: %v, containing: %v", message.FromNode, message.Data) + // Execute the CLI command in it's own go routine, so we are able + // to return immediately with an ack reply that the messag was + // received, and we create a new message to send back to the calling + // node for the out put of the actual command. go func() { c := message.Data[0] a := message.Data[1:] ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) - defer cancel() outCh := make(chan []byte) @@ -388,11 +396,11 @@ func (m methodCLICommandRequest) handler(proc process, message Message, node str select { case <-ctx.Done(): - fmt.Printf(" ** Before\n") - er := fmt.Errorf("error: method timed out %v", proc) + cancel() + er := fmt.Errorf("error: method timed out %v", message) sendErrorLogMessage(proc.newMessagesCh, proc.node, er) - fmt.Printf(" ** After\n") case out := <-outCh: + cancel() // Create a new message for the reply, and put it on the // ringbuffer to be published. @@ -429,18 +437,24 @@ func (m methodCLICommandRequestNOSEQ) getKind() CommandOrEvent { return m.commandOrEvent } +// handler to run a CLI command with timeout context. The handler will +// return the output of the command run back to the calling publisher +// as a new message. // The NOSEQ method will process messages as they are recived, // and the reply back will be sent as soon as the process is // done. No order are preserved. func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, node string) ([]byte, error) { log.Printf("<--- CLICommand REQUEST received from: %v, containing: %v", message.FromNode, message.Data) + // Execute the CLI command in it's own go routine, so we are able + // to return immediately with an ack reply that the messag was + // received, and we create a new message to send back to the calling + // node for the out put of the actual command. go func() { c := message.Data[0] a := message.Data[1:] ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) - defer cancel() outCh := make(chan []byte) @@ -455,11 +469,11 @@ func (m methodCLICommandRequestNOSEQ) handler(proc process, message Message, nod select { case <-ctx.Done(): - fmt.Printf(" ** Before\n") - er := fmt.Errorf("error: method timed out %v", proc) + cancel() + er := fmt.Errorf("error: method timed out %v", message) sendErrorLogMessage(proc.newMessagesCh, proc.node, er) - fmt.Printf(" ** After\n") case out := <-outCh: + cancel() // Create a new message for the reply, and put it on the // ringbuffer to be published.