From c8886121b98585a5f7c67edbf15cc705ee42e440 Mon Sep 17 00:00:00 2001 From: postmannen Date: Tue, 2 Feb 2021 13:06:37 +0100 Subject: [PATCH] checking file for updates and reading content --- file.txt | 0 getmessagefromfile.go | 107 ++++++++++++++++++++++++++++++++++++++++++ go.mod | 1 + go.sum | 4 ++ inmsg.txt | 0 publisher.go | 33 +++++++++++++ 6 files changed, 145 insertions(+) create mode 100644 file.txt create mode 100644 getmessagefromfile.go create mode 100644 inmsg.txt diff --git a/file.txt b/file.txt new file mode 100644 index 0000000..e69de29 diff --git a/getmessagefromfile.go b/getmessagefromfile.go new file mode 100644 index 0000000..3dd6695 --- /dev/null +++ b/getmessagefromfile.go @@ -0,0 +1,107 @@ +package steward + +import ( + "bufio" + "fmt" + "io" + "log" + "os" + + "github.com/fsnotify/fsnotify" +) + +// getMessagesFromFile will start a file watcher for the given directory +// and filename. It will take a channel of []byte as input, and it is +// in this channel the content of a file that has changed is returned. +func getMessagesFromFile(directoryToCheck string, fileName string, fileContentCh chan []byte) { + fileUpdated := make(chan bool) + go fileWatcherStart(directoryToCheck, fileUpdated) + + for { + select { + case <-fileUpdated: + //load file, read it's content + b, err := readTruncateMessageFile(fileName) + if err != nil { + log.Printf("error: reading file: %v", err) + } + + fileContentCh <- b + fmt.Printf("File content read: %s\n", b) + } + } + +} + +// readTruncateMessageFile, will read all the messages in the given +// file, and truncate the file after read. +// A []byte will be returned with the content read. +func readTruncateMessageFile(fileName string) ([]byte, error) { + + f, err := os.OpenFile(fileName, os.O_APPEND|os.O_RDWR, os.ModeAppend) + if err != nil { + log.Printf("Failed to open file %v\n", err) + return nil, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + + lines := []byte{} + + for scanner.Scan() { + lines = append(lines, scanner.Bytes()...) + } + + fmt.Printf("*** DEBUG : %s\n", lines) + + fmt.Printf("read: %s\n", lines) + + // empty the file after all is read + ret, err := f.Seek(0, io.SeekStart) + if err != nil { + return nil, fmt.Errorf("f.Seek failed: %v", err) + } + fmt.Printf("** ret=%v\n", ret) + + err = f.Truncate(0) + if err != nil { + fmt.Printf("******* %#v\n", err) + return nil, fmt.Errorf("f.Truncate failed: %v", err) + } + + return lines, nil +} + +func fileWatcherStart(directoryToCheck string, fileUpdated chan bool) { + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Println("Failed fsnotify.NewWatcher") + return + } + defer watcher.Close() + + done := make(chan bool) + go func() { + //Give a true value to updated so it reads the file the first time. + fileUpdated <- true + for { + select { + case event := <-watcher.Events: + if event.Op&fsnotify.Write == fsnotify.Write { + log.Println("modified file:", event.Name) + //testing with an update chan to get updates + fileUpdated <- true + } + case err := <-watcher.Errors: + log.Println("error:", err) + } + } + }() + + err = watcher.Add(directoryToCheck) + if err != nil { + log.Printf("error: watcher add: %v\n", err) + } + <-done +} diff --git a/go.mod b/go.mod index 09e4830..d77e095 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/RaaLabs/steward go 1.15 require ( + github.com/fsnotify/fsnotify v1.4.9 github.com/golang/protobuf v1.4.3 // indirect github.com/nats-io/nats-server/v2 v2.1.9 // indirect github.com/nats-io/nats.go v1.10.0 diff --git a/go.sum b/go.sum index 5a36776..519afdb 100644 --- a/go.sum +++ b/go.sum @@ -4,6 +4,8 @@ github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -58,6 +60,8 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e h1:D5TXcfTk7xF7hvieo4QErS3qqCB4teTffacDWr7CI+0= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9 h1:L2auWcuQIvxz9xSEqzESnV/QN/gNRXNApHi3fYwl2w0= +golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/inmsg.txt b/inmsg.txt new file mode 100644 index 0000000..e69de29 diff --git a/publisher.go b/publisher.go index 96492de..2049a73 100644 --- a/publisher.go +++ b/publisher.go @@ -92,6 +92,21 @@ func NewServer(brokerAddress string, nodeName string) (*server, error) { } func (s *server) RunPublisher() { + // start the checking of files for input messages + fileReadCh := make((chan []byte)) + go getMessagesFromFile("./", "inmsg.txt", fileReadCh) + + // TODO: For now we just print content of the files read. + // Replace this whit a broker function that will know how + // send it on to the correct publisher. + go func() { + for b := range fileReadCh { + // Check if there are new content read from file input + fmt.Printf("received: %s\n", b) + + } + }() + proc := s.prepareNewProcess("btship1") // fmt.Printf("*** %#v\n", proc) go s.spawnProcess(proc) @@ -118,6 +133,18 @@ type process struct { // errorCh is used to report errors from a process // NB: Implementing this as an int to report for testing errorCh chan string + // subject +} + +type subject struct { + // node, the name of the node + node string + // messageType, command/event + messageType string + // method, what is this message doing, etc. shellcommand, syslog, etc. + method string + // description, usefu + description string } // prepareNewProcess will set the the provided values and the default @@ -145,6 +172,12 @@ func (s *server) spawnProcess(proc process) { // Loop creating one new message every second to simulate getting new // messages to deliver. + // + // TODO: I think it makes most sense that the messages would come to + // here from some other message-pickup-process, and that process will + // give the message to the correct publisher process. A channel that + // is listened on in the for loop below could be used to receive the + // messages from the message-pickup-process. for { m := getMessageToDeliver() m.ID = s.processes[proc.node].messageID