1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2024-12-14 12:37:31 +00:00

checking file for updates and reading content

This commit is contained in:
postmannen 2021-02-02 13:06:37 +01:00
parent ec76bd36cd
commit c8886121b9
6 changed files with 145 additions and 0 deletions

0
file.txt Normal file
View file

107
getmessagefromfile.go Normal file
View file

@ -0,0 +1,107 @@
package steward
import (
"bufio"
"fmt"
"io"
"log"
"os"
"github.com/fsnotify/fsnotify"
)
// getMessagesFromFile will start a file watcher for the given directory
// and filename. It will take a channel of []byte as input, and it is
// in this channel the content of a file that has changed is returned.
func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []byte) {
fileUpdated := make(chan bool)
go fileWatcherStart(directoryToCheck, fileUpdated)
for {
select {
case <-fileUpdated:
//load file, read it's content
b, err := readTruncateMessageFile(fileName)
if err != nil {
log.Printf("error: reading file: %v", err)
}
fileContentCh <- b
fmt.Printf("File content read: %s\n", b)
}
}
}
// readTruncateMessageFile, will read all the messages in the given
// file, and truncate the file after read.
// A []byte will be returned with the content read.
func readTruncateMessageFile(fileName string) ([]byte, error) {
f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR, os.ModeAppend)
if err != nil {
log.Printf("Failed to open file %v\n", err)
return nil, err
}
defer f.Close()
scanner := bufio.NewScanner(f)
lines := []byte{}
for scanner.Scan() {
lines = append(lines, scanner.Bytes()...)
}
fmt.Printf("*** DEBUG : %s\n", lines)
fmt.Printf("read: %s\n", lines)
// empty the file after all is read
ret, err := f.Seek(0, io.SeekStart)
if err != nil {
return nil, fmt.Errorf("f.Seek failed: %v", err)
}
fmt.Printf("** ret=%v\n", ret)
err = f.Truncate(0)
if err != nil {
fmt.Printf("******* %#v\n", err)
return nil, fmt.Errorf("f.Truncate failed: %v", err)
}
return lines, nil
}
func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Println("Failed fsnotify.NewWatcher")
return
}
defer watcher.Close()
done := make(chan bool)
go func() {
//Give a true value to updated so it reads the file the first time.
fileUpdated <- true
for {
select {
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
log.Println("modified file:", event.Name)
//testing with an update chan to get updates
fileUpdated <- true
}
case err := <-watcher.Errors:
log.Println("error:", err)
}
}
}()
err = watcher.Add(directoryToCheck)
if err != nil {
log.Printf("error: watcher add: %v\n", err)
}
<-done
}

1
go.mod
View file

@ -3,6 +3,7 @@ module github.com/RaaLabs/steward
go 1.15
require (
github.com/fsnotify/fsnotify v1.4.9
github.com/golang/protobuf v1.4.3 // indirect
github.com/nats-io/nats-server/v2 v2.1.9 // indirect
github.com/nats-io/nats.go v1.10.0

4
go.sum
View file

@ -4,6 +4,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -58,6 +60,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 h1:L2auWcuQIvxz9xSEqzESnV/QN/gNRXNApHi3fYwl2w0=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=

0
inmsg.txt Normal file
View file

View file

@ -92,6 +92,21 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) {
}
func (s *server) RunPublisher() {
// start the checking of files for input messages
fileReadCh := make((chan []byte))
go getMessagesFromFile("./", "inmsg.txt", fileReadCh)
// TODO: For now we just print content of the files read.
// Replace this whit a broker function that will know how
// send it on to the correct publisher.
go func() {
for b := range fileReadCh {
// Check if there are new content read from file input
fmt.Printf("received: %s\n", b)
}
}()
proc := s.prepareNewProcess("btship1")
// fmt.Printf("*** %#v\n", proc)
go s.spawnProcess(proc)
@ -118,6 +133,18 @@ type process struct {
// errorCh is used to report errors from a process
// NB: Implementing this as an int to report for testing
errorCh chan string
// subject
}
type subject struct {
// node, the name of the node
node string
// messageType, command/event
messageType string
// method, what is this message doing, etc. shellcommand, syslog, etc.
method string
// description, usefu
description string
}
// prepareNewProcess will set the the provided values and the default
@ -145,6 +172,12 @@ func (s *server) spawnProcess(proc process) {
// Loop creating one new message every second to simulate getting new
// messages to deliver.
//
// TODO: I think it makes most sense that the messages would come to
// here from some other message-pickup-process, and that process will
// give the message to the correct publisher process. A channel that
// is listened on in the for loop below could be used to receive the
// messages from the message-pickup-process.
for {
m := getMessageToDeliver()
m.ID = s.processes[proc.node].messageID