From d348ab156fa90ef39113da2f06449a59a0e47372 Mon Sep 17 00:00:00 2001 From: postmannen Date: Sun, 8 Jan 2023 08:32:58 +0100 Subject: [PATCH] added initial readfolder --- configuration_flags.go | 19 ++++++++ go.mod | 2 +- go.sum | 3 ++ message_readers.go | 99 ++++++++++++++++++++++++++++++++++++++++++ server.go | 5 +++ 5 files changed, 127 insertions(+), 1 deletion(-) diff --git a/configuration_flags.go b/configuration_flags.go index 101f80d..140bd8d 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -26,6 +26,10 @@ type Configuration struct { ConfigFolder string `comment:"ConfigFolder, the location for the configuration folder on disk"` // The folder where the socket file should live SocketFolder string `comment:"The folder where the socket file should live"` + // The folder where the readfolder should live + ReadFolder string `comment:"The folder where the readfolder file should live"` + // EnableReadFolder for enabling the read messages api from readfolder + EnableReadFolder bool `comment:"EnableReadFolder for enabling the read messages api from readfolder"` // TCP Listener for sending messages to the system, : TCPListener string `comment:"TCP Listener for sending messages to the system, :"` // HTTP Listener for sending messages to the system, : @@ -148,6 +152,8 @@ type ConfigurationFromFile struct { RingBufferPersistStore *bool RingBufferSize *int SocketFolder *string + ReadFolder *string + EnableReadFolder *bool TCPListener *string HTTPListener *string DatabaseFolder *string @@ -215,6 +221,8 @@ func newConfigurationDefaults() Configuration { RingBufferPersistStore: true, RingBufferSize: 1000, SocketFolder: "./tmp", + ReadFolder: "./readfolder", + EnableReadFolder: true, TCPListener: "", HTTPListener: "", DatabaseFolder: "./var/lib", @@ -297,6 +305,16 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration { } else { conf.SocketFolder = *cf.SocketFolder } + if cf.ReadFolder == nil { + conf.ReadFolder = cd.ReadFolder + } else { + conf.ReadFolder = *cf.ReadFolder + } + if cf.EnableReadFolder == nil { + conf.EnableReadFolder = cd.EnableReadFolder + } else { + conf.EnableReadFolder = *cf.EnableReadFolder + } if cf.TCPListener == nil { conf.TCPListener = cd.TCPListener } else { @@ -593,6 +611,7 @@ func (c *Configuration) CheckFlags() error { flag.BoolVar(&c.RingBufferPersistStore, "ringBufferPersistStore", fc.RingBufferPersistStore, "true/false for enabling the persisting of ringbuffer to disk") flag.IntVar(&c.RingBufferSize, "ringBufferSize", fc.RingBufferSize, "size of the ringbuffer") flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.") + flag.StringVar(&c.ReadFolder, "readfolder", fc.ReadFolder, "folder who contains the readfolder. Defaults to ./readfolder/. If other folder is used this flag must be specified at startup.") flag.StringVar(&c.TCPListener, "tcpListener", fc.TCPListener, "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") flag.StringVar(&c.HTTPListener, "httpListener", fc.HTTPListener, "start up a HTTP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost") flag.StringVar(&c.DatabaseFolder, "databaseFolder", fc.DatabaseFolder, "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.") diff --git a/go.mod b/go.mod index b561dac..6cdf3eb 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/RaaLabs/steward go 1.18 require ( - github.com/fsnotify/fsnotify v1.4.9 + github.com/fsnotify/fsnotify v1.6.0 github.com/fxamacker/cbor/v2 v2.4.0 github.com/gdamore/tcell/v2 v2.5.3 github.com/go-playground/validator/v10 v10.10.1 diff --git a/go.sum b/go.sum index cf716ef..fdf84ec 100644 --- a/go.sum +++ b/go.sum @@ -63,6 +63,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88= github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo= github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko= @@ -435,6 +437,7 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220318055525-2edf467146b5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/message_readers.go b/message_readers.go index 523f498..812b9c0 100644 --- a/message_readers.go +++ b/message_readers.go @@ -10,6 +10,8 @@ import ( "os" "path/filepath" + "github.com/fsnotify/fsnotify" + "gopkg.in/yaml.v3" ) @@ -202,6 +204,103 @@ func (s *server) readSocket() { } } +// readFolder +func (s *server) readFolder() { + // Check if the startup folder exist. + if _, err := os.Stat(s.configuration.ReadFolder); os.IsNotExist(err) { + err := os.MkdirAll(s.configuration.ReadFolder, 0700) + if err != nil { + er := fmt.Errorf("error: failed to create readfolder folder: %v", err) + log.Printf("%v\n", er) + os.Exit(1) + } + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + log.Printf("main: failed to create new logWatcher: %v\n", err) + os.Exit(1) + } + + // Start listening for events. + go func() { + for { + select { + case event, ok := <-watcher.Events: + if !ok { + return + } + + if event.Op == fsnotify.Create || event.Op == fsnotify.Chmod { + fmt.Printf(" *** got *** : %v, op: %v\n", event.Name, event.Op) + } + + func() { + fh, err := os.Open(event.Name) + if err != nil { + log.Printf("error: failed to open readFile from readFolder: %v\n", err) + return + } + + b, err := io.ReadAll(fh) + if err != nil { + log.Printf("error: failed to readall from readFolder: %v\n", err) + fh.Close() + return + } + fh.Close() + + b = bytes.Trim(b, "\x00") + + // unmarshal the JSON into a struct + sams, err := s.convertBytesToSAMs(b) + if err != nil { + er := fmt.Errorf("error: malformed json received on socket: %s\n %v", b, err) + s.errorKernel.errSend(s.processInitial, Message{}, er) + return + } + + for i := range sams { + + // Fill in the value for the FromNode field, so the receiver + // can check this field to know where it came from. + sams[i].Message.FromNode = Node(s.nodeName) + + // Send an info message to the central about the message picked + // for auditing. + er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message) + s.errorKernel.errSend(s.processInitial, Message{}, er) + } + + // Send the SAM struct to be picked up by the ring buffer. + s.toRingBufferCh <- sams + + // Delete the file. + err = os.Remove(event.Name) + if err != nil { + log.Printf("error: failed to remove readFile from readFolder: %v\n", err) + return + } + + }() + + case err, ok := <-watcher.Errors: + if !ok { + return + } + log.Println("error:", err) + } + } + }() + + // Add a path. + err = watcher.Add(s.configuration.ReadFolder) + if err != nil { + log.Printf("startLogsWatcher: failed to add watcher: %v\n", err) + os.Exit(1) + } +} + // readTCPListener wait and read messages delivered on the TCP // port if started. // It will take a channel of []byte as input, and it is in this diff --git a/server.go b/server.go index 76cfdc6..95f96f3 100644 --- a/server.go +++ b/server.go @@ -270,6 +270,11 @@ func (s *server) Start() { go s.readSocket() } + // Start the checking the readfolder for new messages from operator. + if s.configuration.EnableReadFolder { + go s.readFolder() + } + // Check if we should start the tcp listener for new messages from operator. if s.configuration.TCPListener != "" { go s.readTCPListener()