mirror of
https://github.com/postmannen/ctrl.git
synced 2025-04-09 10:24:17 +00:00
changed config directories handling
This commit is contained in:
parent
02ccc9f208
commit
0b44f1d339
4 changed files with 48 additions and 10 deletions
|
@ -79,6 +79,10 @@ func (f *flagNodeSlice) Parse() error {
|
|||
type Configuration struct {
|
||||
// The configuration folder on disk
|
||||
ConfigFolder string
|
||||
// The folder where the socket file should live
|
||||
SocketFolder string
|
||||
// The folder where the database should live
|
||||
DatabaseFolder string
|
||||
// some unique string to identify this Edge unit
|
||||
NodeName string
|
||||
// the address of the message broker
|
||||
|
@ -137,6 +141,8 @@ func NewConfiguration() *Configuration {
|
|||
func newConfigurationDefaults() Configuration {
|
||||
c := Configuration{
|
||||
ConfigFolder: "./etc",
|
||||
SocketFolder: "./tmp",
|
||||
DatabaseFolder: "./var/lib",
|
||||
BrokerAddress: "127.0.0.1:4222",
|
||||
ProfilingPort: "",
|
||||
PromHostAndPort: "",
|
||||
|
@ -178,6 +184,8 @@ func (c *Configuration) CheckFlags() error {
|
|||
*c = fc
|
||||
|
||||
flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "folder who contains the config file. Defaults to ./etc/. If other folder is used this flag must be specified at startup.")
|
||||
flag.StringVar(&c.SocketFolder, "socketFolder", fc.SocketFolder, "folder who contains the socket file. Defaults to ./tmp/. If other folder is used this flag must be specified at startup.")
|
||||
flag.StringVar(&c.DatabaseFolder, "databaseFolder", fc.DatabaseFolder, "folder who contains the database file. Defaults to ./var/lib/. If other folder is used this flag must be specified at startup.")
|
||||
flag.StringVar(&c.NodeName, "nodeName", fc.NodeName, "some unique string to identify this Edge unit")
|
||||
flag.StringVar(&c.BrokerAddress, "brokerAddress", fc.BrokerAddress, "the address of the message broker")
|
||||
flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port")
|
||||
|
@ -250,7 +258,7 @@ func (c *Configuration) ReadConfigFile() (Configuration, error) {
|
|||
// directory for the config file does not exist it will be created.
|
||||
func (c *Configuration) WriteConfigFile() error {
|
||||
if _, err := os.Stat(c.ConfigFolder); os.IsNotExist(err) {
|
||||
err := os.Mkdir(c.ConfigFolder, 0700)
|
||||
err := os.MkdirAll(c.ConfigFolder, 0700)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error: failed to create directory %v: %v", c.ConfigFolder, err)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
@ -42,10 +43,25 @@ type ringBuffer struct {
|
|||
}
|
||||
|
||||
// newringBuffer is a push/pop storage for values.
|
||||
func newringBuffer(size int, dbFileName string, nodeName node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||
db, err := bolt.Open(dbFileName, 0600, nil)
|
||||
func newringBuffer(c Configuration, size int, dbFileName string, nodeName node, newMessagesCh chan []subjectAndMessage) *ringBuffer {
|
||||
// ---
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(c.DatabaseFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.DatabaseFolder, 0700)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to create database directory %v: %v\n", c.DatabaseFolder, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
DatabaseFilepath := filepath.Join(c.DatabaseFolder, dbFileName)
|
||||
|
||||
// ---
|
||||
|
||||
db, err := bolt.Open(DatabaseFilepath, 0600, nil)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to open db: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
return &ringBuffer{
|
||||
bufData: make(chan samDBValue, size),
|
||||
|
|
28
server.go
28
server.go
|
@ -6,6 +6,7 @@ import (
|
|||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -92,13 +93,26 @@ func NewServer(c *Configuration) (*server, error) {
|
|||
}
|
||||
|
||||
// Prepare the connection to the socket file
|
||||
err = os.Remove("steward.sock")
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete sock file: %v", err)
|
||||
return nil, er
|
||||
|
||||
// Check if socket folder exists, if not create it
|
||||
if _, err := os.Stat(c.SocketFolder); os.IsNotExist(err) {
|
||||
err := os.MkdirAll(c.SocketFolder, 0700)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error: failed to create directory %v: %v", c.SocketFolder, err)
|
||||
}
|
||||
}
|
||||
|
||||
nl, err := net.Listen("unix", "steward.sock")
|
||||
socketFilepath := filepath.Join(c.SocketFolder, "steward.sock")
|
||||
|
||||
if _, err := os.Stat(socketFilepath); !os.IsNotExist(err) {
|
||||
err = os.Remove(socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: could not delete sock file: %v", err)
|
||||
return nil, er
|
||||
}
|
||||
}
|
||||
|
||||
nl, err := net.Listen("unix", socketFilepath)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: failed to open socket: %v", err)
|
||||
return nil, er
|
||||
|
@ -161,7 +175,7 @@ func (s *server) Start() {
|
|||
s.processes.printProcessesMap()
|
||||
|
||||
// Start the processing of new messages from an input channel.
|
||||
s.routeMessagesToProcess("./incommmingBuffer.db", s.toRingbufferCh)
|
||||
s.routeMessagesToProcess("./incomingBuffer.db", s.toRingbufferCh)
|
||||
|
||||
select {}
|
||||
|
||||
|
@ -224,7 +238,7 @@ func createErrorMsgContent(FromNode node, theError error) subjectAndMessage {
|
|||
func (s *server) routeMessagesToProcess(dbFileName string, newSAM chan []subjectAndMessage) {
|
||||
// Prepare and start a new ring buffer
|
||||
const bufferSize int = 1000
|
||||
rb := newringBuffer(bufferSize, dbFileName, node(s.nodeName), s.toRingbufferCh)
|
||||
rb := newringBuffer(*s.configuration, bufferSize, dbFileName, node(s.nodeName), s.toRingbufferCh)
|
||||
inCh := make(chan subjectAndMessage)
|
||||
ringBufferOutCh := make(chan samDBValue)
|
||||
// start the ringbuffer.
|
||||
|
|
Loading…
Add table
Reference in a new issue