From 3859ae6b9b7f67c83017d984424260df86c6a658 Mon Sep 17 00:00:00 2001 From: postmannen Date: Fri, 16 Apr 2021 23:58:43 +0200 Subject: [PATCH] fixed closed channel issue, and tail timeout ctx --- example/toShip2-REQTailFile.json | 4 ++-- subscriber_method_types.go | 18 ++++++++++++++---- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/example/toShip2-REQTailFile.json b/example/toShip2-REQTailFile.json index 4f23161..43a5b77 100644 --- a/example/toShip2-REQTailFile.json +++ b/example/toShip2-REQTailFile.json @@ -3,10 +3,10 @@ "directory": "./mine-tail-filer/", "fileExtension": ".log", "toNode": "ship2", - "data": ["./test.log"], + "data": ["/var/log/system.log"], "method":"REQTailFile", "ACKTimeout":5, "retries":3, - "methodTimeout": 200 + "methodTimeout": 10 } ] \ No newline at end of file diff --git a/subscriber_method_types.go b/subscriber_method_types.go index e793e11..9f6e61e 100644 --- a/subscriber_method_types.go +++ b/subscriber_method_types.go @@ -900,10 +900,20 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( go func() { fp := message.Data[0] - ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + var ctx context.Context + var cancel context.CancelFunc + + if message.MethodTimeout != 0 { + ctx, cancel = context.WithTimeout(context.Background(), time.Second*time.Duration(message.MethodTimeout)) + } else { + ctx, cancel = context.WithCancel(context.Background()) + } outCh := make(chan []byte) - t, err := tail.TailFile(fp, tail.Config{Follow: true}) + t, err := tail.TailFile(fp, tail.Config{Follow: true, Location: &tail.SeekInfo{ + Offset: 0, + Whence: os.SEEK_END, + }}) if err != nil { er := fmt.Errorf("error: tailFile: %v", err) log.Printf("%v\n", er) @@ -922,8 +932,8 @@ func (m methodREQTailFile) handler(proc process, message Message, node string) ( cancel() // Close the lines channel so we exit the reading lines // go routine. - close(t.Lines) - er := fmt.Errorf("error: method timed out %v", message) + // close(t.Lines) + er := fmt.Errorf("info: method timeout reached, canceling: %v", message) sendErrorLogMessage(proc.toRingbufferCh, proc.node, er) return case out := <-outCh: