From 32aeed6c181ad097f483400bdc4c280ec1ad18a2 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 26 Jan 2022 15:35:31 +0100 Subject: [PATCH] added -1 as methodTimeout for long running request --- requests.go | 62 +++++++++++++++++++++++++++++++++++++-------------- ringbuffer.go | 2 +- 2 files changed, 46 insertions(+), 18 deletions(-) diff --git a/requests.go b/requests.go index 6a4736d..7457765 100644 --- a/requests.go +++ b/requests.go @@ -231,9 +231,19 @@ func (m Method) getHandler(method Method) methodHandler { return mh } -// The structure that works as a reference for all the methods and if -// they are of the command or event type, and also if it is a ACK or -// NACK message. +// getContextForMethodTimeout, will return a context with cancel function +// with the timeout set to the method timeout in the message. +// If the value of timeout is set to -1, we don't want it to stop, so we +// return a context with a timeout set to 200 years. +func getContextForMethodTimeout(ctx context.Context, message Message) (context.Context, context.CancelFunc) { + // If methodTimeout == -1, which means we don't want a timeout, set the + // time out to 200 years. + if message.MethodTimeout == -1 { + return context.WithTimeout(ctx, time.Hour*time.Duration(8760*200)) + } + + return context.WithTimeout(ctx, time.Second*time.Duration(message.MethodTimeout)) +} // ---- @@ -716,7 +726,8 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin DstNode := message.MethodArgs[1] DstFilePath := message.MethodArgs[2] - ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) defer cancel() outCh := make(chan []byte) @@ -812,7 +823,7 @@ func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, e select { case outCh <- b: - fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n") + // fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n") case <-ctx.Done(): return } @@ -841,7 +852,8 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string) go func() { defer proc.processes.wg.Done() - ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) defer cancel() // Put data that should be the result of the action done in the inner @@ -1194,7 +1206,8 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) c := message.MethodArgs[0] - ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) outCh := make(chan []byte) @@ -1236,6 +1249,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string) er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String()) proc.processes.errorKernel.errSend(proc, message, er) } + select { case outCh <- out.Bytes(): case <-ctx.Done(): @@ -1360,7 +1374,8 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([ Timeout: time.Duration(message.MethodTimeout), } - ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) if err != nil { @@ -1456,14 +1471,18 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( fp := message.MethodArgs[0] - var ctx context.Context - var cancel context.CancelFunc + // var ctx context.Context + // var cancel context.CancelFunc - if message.MethodTimeout != 0 { - ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) - } else { - ctx, cancel = context.WithCancel(proc.ctx) - } + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) + + // Note: Replacing the 0 timeout with specific timeout. + // if message.MethodTimeout != 0 { + // ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // } else { + // ctx, cancel = context.WithCancel(proc.ctx) + // } outCh := make(chan []byte) t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{ @@ -1540,6 +1559,10 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str go func() { defer proc.processes.wg.Done() + defer func() { + fmt.Printf(" * DONE *\n") + }() + var a []string switch { @@ -1554,7 +1577,10 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str c := message.MethodArgs[0] - ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) + // deadline, _ := ctx.Deadline() + // fmt.Printf(" * DEBUG * deadline : %v\n", deadline) outCh := make(chan []byte) errCh := make(chan string) @@ -1628,6 +1654,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str proc.processes.errorKernel.infoSend(proc, message, er) return case out := <-outCh: + fmt.Printf(" * out: %v\n", string(out)) newReplyMessage(proc, message, out) case out := <-errCh: newReplyMessage(proc, message, []byte(out)) @@ -1678,7 +1705,8 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin go func() { defer proc.processes.wg.Done() - ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) defer cancel() outCh := make(chan []byte) diff --git a/ringbuffer.go b/ringbuffer.go index ee0de2c..88eeabf 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -180,7 +180,7 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage if v.Message.Retries < 1 { v.Message.Retries = r.configuration.DefaultMessageRetries } - if v.Message.MethodTimeout < 1 { + if v.Message.MethodTimeout < 1 && v.Message.MethodTimeout != -1 { v.Message.MethodTimeout = r.configuration.DefaultMethodTimeout }