1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

added -1 as methodTimeout for long running request

This commit is contained in:
postmannen 2022-01-26 15:35:31 +01:00
parent 88672f1e35
commit 32aeed6c18
2 changed files with 46 additions and 18 deletions

View file

@ -231,9 +231,19 @@ func (m Method) getHandler(method Method) methodHandler {
return mh return mh
} }
// The structure that works as a reference for all the methods and if // getContextForMethodTimeout, will return a context with cancel function
// they are of the command or event type, and also if it is a ACK or // with the timeout set to the method timeout in the message.
// NACK message. // If the value of timeout is set to -1, we don't want it to stop, so we
// return a context with a timeout set to 200 years.
func getContextForMethodTimeout(ctx context.Context, message Message) (context.Context, context.CancelFunc) {
// If methodTimeout == -1, which means we don't want a timeout, set the
// time out to 200 years.
if message.MethodTimeout == -1 {
return context.WithTimeout(ctx, time.Hour*time.Duration(8760*200))
}
return context.WithTimeout(ctx, time.Second*time.Duration(message.MethodTimeout))
}
// ---- // ----
@ -716,7 +726,8 @@ func (m methodREQCopyFileFrom) handler(proc process, message Message, node strin
DstNode := message.MethodArgs[1] DstNode := message.MethodArgs[1]
DstFilePath := message.MethodArgs[2] DstFilePath := message.MethodArgs[2]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
defer cancel() defer cancel()
outCh := make(chan []byte) outCh := make(chan []byte)
@ -812,7 +823,7 @@ func copyFileFrom(ctx context.Context, wg *sync.WaitGroup, SrcFilePath string, e
select { select {
case outCh <- b: case outCh <- b:
fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n") // fmt.Printf(" * DEBUG: after io.ReadAll: outCh <- b\n")
case <-ctx.Done(): case <-ctx.Done():
return return
} }
@ -841,7 +852,8 @@ func (m methodREQCopyFileTo) handler(proc process, message Message, node string)
go func() { go func() {
defer proc.processes.wg.Done() defer proc.processes.wg.Done()
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
defer cancel() defer cancel()
// Put data that should be the result of the action done in the inner // Put data that should be the result of the action done in the inner
@ -1194,7 +1206,8 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
c := message.MethodArgs[0] c := message.MethodArgs[0]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
outCh := make(chan []byte) outCh := make(chan []byte)
@ -1236,6 +1249,7 @@ func (m methodREQCliCommand) handler(proc process, message Message, node string)
er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String()) er := fmt.Errorf("error: methodREQCliCommand: cmd.Run failed : %v, methodArgs: %v, error_output: %v", err, message.MethodArgs, stderr.String())
proc.processes.errorKernel.errSend(proc, message, er) proc.processes.errorKernel.errSend(proc, message, er)
} }
select { select {
case outCh <- out.Bytes(): case outCh <- out.Bytes():
case <-ctx.Done(): case <-ctx.Done():
@ -1360,7 +1374,8 @@ func (m methodREQHttpGet) handler(proc process, message Message, node string) ([
Timeout: time.Duration(message.MethodTimeout), Timeout: time.Duration(message.MethodTimeout),
} }
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil { if err != nil {
@ -1456,14 +1471,18 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) (
fp := message.MethodArgs[0] fp := message.MethodArgs[0]
var ctx context.Context // var ctx context.Context
var cancel context.CancelFunc // var cancel context.CancelFunc
if message.MethodTimeout != 0 { // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
} else {
ctx, cancel = context.WithCancel(proc.ctx) // Note: Replacing the 0 timeout with specific timeout.
} // if message.MethodTimeout != 0 {
// ctx, cancel = context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout))
// } else {
// ctx, cancel = context.WithCancel(proc.ctx)
// }
outCh := make(chan []byte) outCh := make(chan []byte)
t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{ t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{
@ -1540,6 +1559,10 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
go func() { go func() {
defer proc.processes.wg.Done() defer proc.processes.wg.Done()
defer func() {
fmt.Printf(" * DONE *\n")
}()
var a []string var a []string
switch { switch {
@ -1554,7 +1577,10 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
c := message.MethodArgs[0] c := message.MethodArgs[0]
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
// deadline, _ := ctx.Deadline()
// fmt.Printf(" * DEBUG * deadline : %v\n", deadline)
outCh := make(chan []byte) outCh := make(chan []byte)
errCh := make(chan string) errCh := make(chan string)
@ -1628,6 +1654,7 @@ func (m methodREQCliCommandCont) handler(proc process, message Message, node str
proc.processes.errorKernel.infoSend(proc, message, er) proc.processes.errorKernel.infoSend(proc, message, er)
return return
case out := <-outCh: case out := <-outCh:
fmt.Printf(" * out: %v\n", string(out))
newReplyMessage(proc, message, out) newReplyMessage(proc, message, out)
case out := <-errCh: case out := <-errCh:
newReplyMessage(proc, message, []byte(out)) newReplyMessage(proc, message, []byte(out))
@ -1678,7 +1705,8 @@ func (m methodREQRelayInitial) handler(proc process, message Message, node strin
go func() { go func() {
defer proc.processes.wg.Done() defer proc.processes.wg.Done()
ctx, cancel := context.WithTimeout(proc.ctx, time.Second*time.Duration(message.MethodTimeout)) // Get a context with the timeout specified in message.MethodTimeout.
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
defer cancel() defer cancel()
outCh := make(chan []byte) outCh := make(chan []byte)

View file

@ -180,7 +180,7 @@ func (r *ringBuffer) fillBuffer(ctx context.Context, inCh chan subjectAndMessage
if v.Message.Retries < 1 { if v.Message.Retries < 1 {
v.Message.Retries = r.configuration.DefaultMessageRetries v.Message.Retries = r.configuration.DefaultMessageRetries
} }
if v.Message.MethodTimeout < 1 { if v.Message.MethodTimeout < 1 && v.Message.MethodTimeout != -1 {
v.Message.MethodTimeout = r.configuration.DefaultMethodTimeout v.Message.MethodTimeout = r.configuration.DefaultMethodTimeout
} }