mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
added initial readfolder
This commit is contained in:
parent
bcc5db7c04
commit
d348ab156f
5 changed files with 127 additions and 1 deletions
|
@ -26,6 +26,10 @@ type Configuration struct {
|
|||
ConfigFolder string `comment:"ConfigFolder, the location for the configuration folder on disk"`
|
||||
// The folder where the socket file should live
|
||||
SocketFolder string `comment:"The folder where the socket file should live"`
|
||||
// The folder where the readfolder should live
|
||||
ReadFolder string `comment:"The folder where the readfolder file should live"`
|
||||
// EnableReadFolder for enabling the read messages api from readfolder
|
||||
EnableReadFolder bool `comment:"EnableReadFolder for enabling the read messages api from readfolder"`
|
||||
// TCP Listener for sending messages to the system, <host>:<port>
|
||||
TCPListener string `comment:"TCP Listener for sending messages to the system, <host>:<port>"`
|
||||
// HTTP Listener for sending messages to the system, <host>:<port>
|
||||
|
@ -148,6 +152,8 @@ type ConfigurationFromFile struct {
|
|||
RingBufferPersistStore *bool
|
||||
RingBufferSize *int
|
||||
SocketFolder *string
|
||||
ReadFolder *string
|
||||
EnableReadFolder *bool
|
||||
TCPListener *string
|
||||
HTTPListener *string
|
||||
DatabaseFolder *string
|
||||
|
@ -215,6 +221,8 @@ func newConfigurationDefaults() Configuration {
|
|||
RingBufferPersistStore: true,
|
||||
RingBufferSize: 1000,
|
||||
SocketFolder: "./tmp",
|
||||
ReadFolder: "./readfolder",
|
||||
EnableReadFolder: true,
|
||||
TCPListener: "",
|
||||
HTTPListener: "",
|
||||
DatabaseFolder: "./var/lib",
|
||||
|
@ -297,6 +305,16 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
|||
} else {
|
||||
conf.SocketFolder = *cf.SocketFolder
|
||||
}
|
||||
if cf.ReadFolder == nil {
|
||||
conf.ReadFolder = cd.ReadFolder
|
||||
} else {
|
||||
conf.ReadFolder = *cf.ReadFolder
|
||||
}
|
||||
if cf.EnableReadFolder == nil {
|
||||
conf.EnableReadFolder = cd.EnableReadFolder
|
||||
} else {
|
||||
conf.EnableReadFolder = *cf.EnableReadFolder
|
||||
}
|
||||
if cf.TCPListener == nil {
|
||||
conf.TCPListener = cd.TCPListener
|
||||
} else {
|
||||
|
@ -593,6 +611,7 @@ func (c *Configuration) CheckFlags() error {
|
|||
flag.BoolVar(&c.RingBufferPersistStore, "ringBufferPersistStore", fc.RingBufferPersistStore, "true/false for enabling the persisting of ringbuffer to disk")
|
||||
flag.IntVar(&c.RingBufferSize, "ringBufferSize", fc.RingBufferSize, "size of the ringbuffer")
|
||||
flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.")
|
||||
flag.StringVar(&c.ReadFolder, "readfolder", fc.ReadFolder, "folder who contains the readfolder. Defaults to ./readfolder/. If other folder is used this flag must be specified at startup.")
|
||||
flag.StringVar(&c.TCPListener, "tcpListener", fc.TCPListener, "start up a TCP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost")
|
||||
flag.StringVar(&c.HTTPListener, "httpListener", fc.HTTPListener, "start up a HTTP listener in addition to the Unix Socket, to give messages to the system. e.g. localhost:8888. No value means not to start the listener, which is default. NB: You probably don't want to start this on any other interface than localhost")
|
||||
flag.StringVar(&c.DatabaseFolder, "databaseFolder", fc.DatabaseFolder, "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.")
|
||||
|
|
2
go.mod
2
go.mod
|
@ -3,7 +3,7 @@ module github.com/RaaLabs/steward
|
|||
go 1.18
|
||||
|
||||
require (
|
||||
github.com/fsnotify/fsnotify v1.4.9
|
||||
github.com/fsnotify/fsnotify v1.6.0
|
||||
github.com/fxamacker/cbor/v2 v2.4.0
|
||||
github.com/gdamore/tcell/v2 v2.5.3
|
||||
github.com/go-playground/validator/v10 v10.10.1
|
||||
|
|
3
go.sum
3
go.sum
|
@ -63,6 +63,8 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
|
|||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
|
||||
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
|
||||
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
|
||||
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
|
||||
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
|
||||
github.com/fxamacker/cbor/v2 v2.4.0 h1:ri0ArlOR+5XunOP8CRUowT0pSJOwhW098ZCUyskZD88=
|
||||
github.com/fxamacker/cbor/v2 v2.4.0/go.mod h1:TA1xS00nchWmaBnEIxPSE5oHLuJBAVvqrtAnWBwBCVo=
|
||||
github.com/gdamore/encoding v1.0.0 h1:+7OoQ1Bc6eTm5niUzBa0Ctsh6JbMW6Ra+YNuAtDBdko=
|
||||
|
@ -435,6 +437,7 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20220318055525-2edf467146b5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c h1:aFV+BgZ4svzjfabn8ERpuB4JI4N6/rdy1iusx77G3oU=
|
||||
golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.3.0 h1:w8ZOecv6NaNa/zC8944JTU3vz4u6Lagfk4RPQxv92NQ=
|
||||
golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/fsnotify/fsnotify"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
|
@ -202,6 +204,103 @@ func (s *server) readSocket() {
|
|||
}
|
||||
}
|
||||
|
||||
// readFolder
|
||||
func (s *server) readFolder() {
|
||||
// Check if the startup folder exist.
|
||||
if _, err := os.Stat(s.configuration.ReadFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(s.configuration.ReadFolder, 0700)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to create readfolder folder: %v", err)
|
||||
log.Printf("%v\n", er)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Printf("main: failed to create new logWatcher: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
// Start listening for events.
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case event, ok := <-watcher.Events:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if event.Op == fsnotify.Create || event.Op == fsnotify.Chmod {
|
||||
fmt.Printf(" *** got *** : %v, op: %v\n", event.Name, event.Op)
|
||||
}
|
||||
|
||||
func() {
|
||||
fh, err := os.Open(event.Name)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to open readFile from readFolder: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
b, err := io.ReadAll(fh)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to readall from readFolder: %v\n", err)
|
||||
fh.Close()
|
||||
return
|
||||
}
|
||||
fh.Close()
|
||||
|
||||
b = bytes.Trim(b, "\x00")
|
||||
|
||||
// unmarshal the JSON into a struct
|
||||
sams, err := s.convertBytesToSAMs(b)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: malformed json received on socket: %s\n %v", b, err)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
return
|
||||
}
|
||||
|
||||
for i := range sams {
|
||||
|
||||
// Fill in the value for the FromNode field, so the receiver
|
||||
// can check this field to know where it came from.
|
||||
sams[i].Message.FromNode = Node(s.nodeName)
|
||||
|
||||
// Send an info message to the central about the message picked
|
||||
// for auditing.
|
||||
er := fmt.Errorf("info: message read from socket on %v: %v", s.nodeName, sams[i].Message)
|
||||
s.errorKernel.errSend(s.processInitial, Message{}, er)
|
||||
}
|
||||
|
||||
// Send the SAM struct to be picked up by the ring buffer.
|
||||
s.toRingBufferCh <- sams
|
||||
|
||||
// Delete the file.
|
||||
err = os.Remove(event.Name)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to remove readFile from readFolder: %v\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
case err, ok := <-watcher.Errors:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
log.Println("error:", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Add a path.
|
||||
err = watcher.Add(s.configuration.ReadFolder)
|
||||
if err != nil {
|
||||
log.Printf("startLogsWatcher: failed to add watcher: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// readTCPListener wait and read messages delivered on the TCP
|
||||
// port if started.
|
||||
// It will take a channel of []byte as input, and it is in this
|
||||
|
|
|
@ -270,6 +270,11 @@ func (s *server) Start() {
|
|||
go s.readSocket()
|
||||
}
|
||||
|
||||
// Start the checking the readfolder for new messages from operator.
|
||||
if s.configuration.EnableReadFolder {
|
||||
go s.readFolder()
|
||||
}
|
||||
|
||||
// Check if we should start the tcp listener for new messages from operator.
|
||||
if s.configuration.TCPListener != "" {
|
||||
go s.readTCPListener()
|
||||
|
|
Loading…
Add table
Reference in a new issue