mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-07 04:49:17 +00:00
Squashed commit of the following:
commit 2a5cb441ff8c3f3f4404c49188c16afdb701cd42 Author: postmannen <postmannen@gmail.com> Date: Tue Nov 19 06:46:58 2024 +0100 updated doc commit a97d2dd4fd162a45dcf8f80d833a31bdc7e0b817 Author: postmannen <postmannen@gmail.com> Date: Tue Nov 19 06:25:57 2024 +0100 renamed the various naming for channels where new messages are put to newMessagesCh commit 3c840eb718358bb55d9e1c58692517e4bbbb5f4d Author: postmannen <postmannen@gmail.com> Date: Tue Nov 19 05:08:55 2024 +0100 Restructured configuration, and added a StartProcesses struct for the flags that are for starting processes
This commit is contained in:
parent
b1a5406598
commit
6c615591a6
10 changed files with 89 additions and 76 deletions
|
@ -88,8 +88,7 @@ type Configuration struct {
|
||||||
EnableSignatureCheck bool `comment:"EnableSignatureCheck to enable signature checking"`
|
EnableSignatureCheck bool `comment:"EnableSignatureCheck to enable signature checking"`
|
||||||
// EnableAclCheck to enable ACL checking
|
// EnableAclCheck to enable ACL checking
|
||||||
EnableAclCheck bool `comment:"EnableAclCheck to enable ACL checking"`
|
EnableAclCheck bool `comment:"EnableAclCheck to enable ACL checking"`
|
||||||
// IsCentralAuth, enable to make this instance take the role as the central auth server
|
|
||||||
IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
|
|
||||||
// EnableDebug will also enable printing all the messages received in the errorKernel to STDERR.
|
// EnableDebug will also enable printing all the messages received in the errorKernel to STDERR.
|
||||||
EnableDebug bool `comment:"EnableDebug will also enable printing all the messages received in the errorKernel to STDERR."`
|
EnableDebug bool `comment:"EnableDebug will also enable printing all the messages received in the errorKernel to STDERR."`
|
||||||
// LogLevel
|
// LogLevel
|
||||||
|
@ -102,6 +101,10 @@ type Configuration struct {
|
||||||
// it have not received any messages for the given amount of time.
|
// it have not received any messages for the given amount of time.
|
||||||
KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
|
KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
|
||||||
|
|
||||||
|
StartProcesses StartProcesses
|
||||||
|
}
|
||||||
|
|
||||||
|
type StartProcesses struct {
|
||||||
// StartPubHello, sets the interval in seconds for how often we send hello messages to central server
|
// StartPubHello, sets the interval in seconds for how often we send hello messages to central server
|
||||||
StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
|
StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
|
||||||
// Enable the updates of public keys
|
// Enable the updates of public keys
|
||||||
|
@ -132,6 +135,9 @@ type Configuration struct {
|
||||||
StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
|
StartSubTailFile bool `comment:"Start subscriber for tailing log files"`
|
||||||
// Start subscriber for continously delivery of output from cli commands.
|
// Start subscriber for continously delivery of output from cli commands.
|
||||||
StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
|
StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."`
|
||||||
|
|
||||||
|
// IsCentralAuth, enable to make this instance take the role as the central auth server
|
||||||
|
IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConfiguration will return a *Configuration.
|
// NewConfiguration will return a *Configuration.
|
||||||
|
@ -140,7 +146,7 @@ func NewConfiguration() *Configuration {
|
||||||
|
|
||||||
err := godotenv.Load()
|
err := godotenv.Load()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error loading .env file: %v\n", err)
|
log.Printf("info: no .env file found, will only use env vars or flags: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
//flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/ctrl/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER")
|
//flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/ctrl/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER")
|
||||||
|
@ -177,7 +183,7 @@ func NewConfiguration() *Configuration {
|
||||||
flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file")
|
flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file")
|
||||||
flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.")
|
flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.")
|
||||||
flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.")
|
flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.")
|
||||||
flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server")
|
flag.BoolVar(&c.StartProcesses.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.StartProcesses.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server")
|
||||||
flag.BoolVar(&c.EnableDebug, "enableDebug", CheckEnv("ENABLE_DEBUG", c.EnableDebug).(bool), "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR")
|
flag.BoolVar(&c.EnableDebug, "enableDebug", CheckEnv("ENABLE_DEBUG", c.EnableDebug).(bool), "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR")
|
||||||
flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
|
flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
|
||||||
flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
|
flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
|
||||||
|
@ -185,23 +191,23 @@ func NewConfiguration() *Configuration {
|
||||||
|
|
||||||
// Start of Request publishers/subscribers
|
// Start of Request publishers/subscribers
|
||||||
|
|
||||||
flag.IntVar(&c.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds")
|
flag.IntVar(&c.StartProcesses.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartProcesses.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds")
|
||||||
|
|
||||||
flag.BoolVar(&c.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.EnableKeyUpdates).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.StartProcesses.EnableKeyUpdates).(bool), "true/false")
|
||||||
|
|
||||||
flag.BoolVar(&c.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.EnableAclUpdates).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.StartProcesses.EnableAclUpdates).(bool), "true/false")
|
||||||
|
|
||||||
flag.BoolVar(&c.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.IsCentralErrorLogger).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.StartProcesses.IsCentralErrorLogger).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartSubHello).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartProcesses.StartSubHello).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartSubFileAppend).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartProcesses.StartSubFileAppend).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartSubFile).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartProcesses.StartSubFile).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartSubCopySrc).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartProcesses.StartSubCopySrc).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartSubCopyDst).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartProcesses.StartSubCopyDst).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartSubCliCommand).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartProcesses.StartSubCliCommand).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartSubConsole).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartProcesses.StartSubConsole).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartSubHttpGet).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartProcesses.StartSubHttpGet).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartSubTailFile).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartProcesses.StartSubTailFile).(bool), "true/false")
|
||||||
flag.BoolVar(&c.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartSubCliCommandCont).(bool), "true/false")
|
flag.BoolVar(&c.StartProcesses.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartProcesses.StartSubCliCommandCont).(bool), "true/false")
|
||||||
|
|
||||||
// Check that mandatory flag values have been set.
|
// Check that mandatory flag values have been set.
|
||||||
switch {
|
switch {
|
||||||
|
@ -254,12 +260,12 @@ func newConfigurationDefaults() Configuration {
|
||||||
EnableSocket: true,
|
EnableSocket: true,
|
||||||
EnableSignatureCheck: false,
|
EnableSignatureCheck: false,
|
||||||
EnableAclCheck: false,
|
EnableAclCheck: false,
|
||||||
IsCentralAuth: false,
|
|
||||||
EnableDebug: false,
|
EnableDebug: false,
|
||||||
LogLevel: "debug",
|
LogLevel: "debug",
|
||||||
LogConsoleTimestamps: false,
|
LogConsoleTimestamps: false,
|
||||||
KeepPublishersAliveFor: 10,
|
KeepPublishersAliveFor: 10,
|
||||||
|
|
||||||
|
StartProcesses: StartProcesses{
|
||||||
StartPubHello: 30,
|
StartPubHello: 30,
|
||||||
EnableKeyUpdates: false,
|
EnableKeyUpdates: false,
|
||||||
EnableAclUpdates: false,
|
EnableAclUpdates: false,
|
||||||
|
@ -274,6 +280,8 @@ func newConfigurationDefaults() Configuration {
|
||||||
StartSubHttpGet: true,
|
StartSubHttpGet: true,
|
||||||
StartSubTailFile: true,
|
StartSubTailFile: true,
|
||||||
StartSubCliCommandCont: true,
|
StartSubCliCommandCont: true,
|
||||||
|
IsCentralAuth: false,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,8 @@ method : cliCommand
|
||||||
|
|
||||||
What request method type to use, like cliCommand, httpGet..
|
What request method type to use, like cliCommand, httpGet..
|
||||||
|
|
||||||
|
All [methods](./core_request_methods.md).
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
methodArgs :
|
methodArgs :
|
||||||
- "bash"
|
- "bash"
|
||||||
|
@ -96,6 +98,7 @@ methodTimeout : 10
|
||||||
```
|
```
|
||||||
|
|
||||||
Timeout for how long a method should be allowed to run before it is timed out.
|
Timeout for how long a method should be allowed to run before it is timed out.
|
||||||
|
If `methodTimeout : -1` the method will not time out.
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
replyMethodTimeout : 10
|
replyMethodTimeout : 10
|
||||||
|
|
|
@ -211,7 +211,7 @@ func (s *server) readSocket() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.samToSendCh <- sams
|
s.newMessagesCh <- sams
|
||||||
s.auditLogCh <- sams
|
s.auditLogCh <- sams
|
||||||
|
|
||||||
}(conn)
|
}(conn)
|
||||||
|
@ -293,7 +293,7 @@ func (s *server) readFolder() {
|
||||||
s.errorKernel.logDebug(er)
|
s.errorKernel.logDebug(er)
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.samToSendCh <- sams
|
s.newMessagesCh <- sams
|
||||||
s.auditLogCh <- sams
|
s.auditLogCh <- sams
|
||||||
|
|
||||||
// Delete the file.
|
// Delete the file.
|
||||||
|
@ -386,7 +386,7 @@ func (s *server) readTCPListener() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.samToSendCh <- sams
|
s.newMessagesCh <- sams
|
||||||
s.auditLogCh <- sams
|
s.auditLogCh <- sams
|
||||||
|
|
||||||
}(conn)
|
}(conn)
|
||||||
|
@ -431,7 +431,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the SAM struct to be picked up by the ring buffer.
|
// Send the SAM struct to be picked up by the ring buffer.
|
||||||
s.samToSendCh <- sams
|
s.newMessagesCh <- sams
|
||||||
s.auditLogCh <- sams
|
s.auditLogCh <- sams
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -88,7 +88,7 @@ type process struct {
|
||||||
// copy of the configuration from server
|
// copy of the configuration from server
|
||||||
configuration *Configuration
|
configuration *Configuration
|
||||||
// The new messages channel copied from *Server
|
// The new messages channel copied from *Server
|
||||||
toRingbufferCh chan<- []subjectAndMessage
|
newMessagesCh chan<- []subjectAndMessage
|
||||||
// The structure who holds all processes information
|
// The structure who holds all processes information
|
||||||
processes *processes
|
processes *processes
|
||||||
// nats connection
|
// nats connection
|
||||||
|
@ -143,7 +143,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
||||||
processID: pid,
|
processID: pid,
|
||||||
processKind: processKind,
|
processKind: processKind,
|
||||||
methodsAvailable: method.GetMethodsAvailable(),
|
methodsAvailable: method.GetMethodsAvailable(),
|
||||||
toRingbufferCh: server.samToSendCh,
|
newMessagesCh: server.newMessagesCh,
|
||||||
configuration: server.configuration,
|
configuration: server.configuration,
|
||||||
processes: server.processes,
|
processes: server.processes,
|
||||||
natsConn: server.natsConn,
|
natsConn: server.natsConn,
|
||||||
|
|
40
processes.go
40
processes.go
|
@ -97,23 +97,23 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.subscriber(proc, OpProcessStop, nil)
|
proc.startup.subscriber(proc, OpProcessStop, nil)
|
||||||
proc.startup.subscriber(proc, Test, nil)
|
proc.startup.subscriber(proc, Test, nil)
|
||||||
|
|
||||||
if proc.configuration.StartSubFileAppend {
|
if proc.configuration.StartProcesses.StartSubFileAppend {
|
||||||
proc.startup.subscriber(proc, FileAppend, nil)
|
proc.startup.subscriber(proc, FileAppend, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubFile {
|
if proc.configuration.StartProcesses.StartSubFile {
|
||||||
proc.startup.subscriber(proc, File, nil)
|
proc.startup.subscriber(proc, File, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubCopySrc {
|
if proc.configuration.StartProcesses.StartSubCopySrc {
|
||||||
proc.startup.subscriber(proc, CopySrc, nil)
|
proc.startup.subscriber(proc, CopySrc, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubCopyDst {
|
if proc.configuration.StartProcesses.StartSubCopyDst {
|
||||||
proc.startup.subscriber(proc, CopyDst, nil)
|
proc.startup.subscriber(proc, CopyDst, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubHello {
|
if proc.configuration.StartProcesses.StartSubHello {
|
||||||
// subREQHello is the handler that is triggered when we are receiving a hello
|
// subREQHello is the handler that is triggered when we are receiving a hello
|
||||||
// message. To keep the state of all the hello's received from nodes we need
|
// message. To keep the state of all the hello's received from nodes we need
|
||||||
// to also start a procFunc that will live as a go routine tied to this process,
|
// to also start a procFunc that will live as a go routine tied to this process,
|
||||||
|
@ -152,21 +152,21 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.subscriber(proc, Hello, pf)
|
proc.startup.subscriber(proc, Hello, pf)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.IsCentralErrorLogger {
|
if proc.configuration.StartProcesses.IsCentralErrorLogger {
|
||||||
proc.startup.subscriber(proc, ErrorLog, nil)
|
proc.startup.subscriber(proc, ErrorLog, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubCliCommand {
|
if proc.configuration.StartProcesses.StartSubCliCommand {
|
||||||
proc.startup.subscriber(proc, CliCommand, nil)
|
proc.startup.subscriber(proc, CliCommand, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubConsole {
|
if proc.configuration.StartProcesses.StartSubConsole {
|
||||||
proc.startup.subscriber(proc, Console, nil)
|
proc.startup.subscriber(proc, Console, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartPubHello != 0 {
|
if proc.configuration.StartProcesses.StartPubHello != 0 {
|
||||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubHello))
|
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello))
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
for {
|
for {
|
||||||
|
|
||||||
|
@ -191,7 +191,7 @@ func (p *processes) Start(proc process) {
|
||||||
er := fmt.Errorf("error: ProcessesStart: %v", err)
|
er := fmt.Errorf("error: ProcessesStart: %v", err)
|
||||||
p.errorKernel.errSend(proc, m, er, logError)
|
p.errorKernel.errSend(proc, m, er, logError)
|
||||||
}
|
}
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -206,7 +206,7 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.publisher(proc, Hello, pf)
|
proc.startup.publisher(proc, Hello, pf)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.EnableKeyUpdates {
|
if proc.configuration.StartProcesses.EnableKeyUpdates {
|
||||||
// Define the startup of a publisher that will send KeysRequestUpdate
|
// Define the startup of a publisher that will send KeysRequestUpdate
|
||||||
// to central server and ask for publics keys, and to get them deliver back with a request
|
// to central server and ask for publics keys, and to get them deliver back with a request
|
||||||
// of type KeysDeliverUpdate.
|
// of type KeysDeliverUpdate.
|
||||||
|
@ -241,7 +241,7 @@ func (p *processes) Start(proc process) {
|
||||||
// In theory the system should drop the message before it reaches here.
|
// In theory the system should drop the message before it reaches here.
|
||||||
p.errorKernel.errSend(proc, m, err, logError)
|
p.errorKernel.errSend(proc, m, err, logError)
|
||||||
}
|
}
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -257,7 +257,7 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.subscriber(proc, KeysDeliverUpdate, nil)
|
proc.startup.subscriber(proc, KeysDeliverUpdate, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.EnableAclUpdates {
|
if proc.configuration.StartProcesses.EnableAclUpdates {
|
||||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval))
|
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval))
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
@ -290,7 +290,7 @@ func (p *processes) Start(proc process) {
|
||||||
p.errorKernel.errSend(proc, m, err, logError)
|
p.errorKernel.errSend(proc, m, err, logError)
|
||||||
log.Printf("error: ProcessesStart: %v\n", err)
|
log.Printf("error: ProcessesStart: %v\n", err)
|
||||||
}
|
}
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
@ -306,7 +306,7 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.subscriber(proc, AclDeliverUpdate, nil)
|
proc.startup.subscriber(proc, AclDeliverUpdate, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.IsCentralAuth {
|
if proc.configuration.StartProcesses.IsCentralAuth {
|
||||||
proc.startup.subscriber(proc, KeysRequestUpdate, nil)
|
proc.startup.subscriber(proc, KeysRequestUpdate, nil)
|
||||||
proc.startup.subscriber(proc, KeysAllow, nil)
|
proc.startup.subscriber(proc, KeysAllow, nil)
|
||||||
proc.startup.subscriber(proc, KeysDelete, nil)
|
proc.startup.subscriber(proc, KeysDelete, nil)
|
||||||
|
@ -324,15 +324,15 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.subscriber(proc, AclImport, nil)
|
proc.startup.subscriber(proc, AclImport, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubHttpGet {
|
if proc.configuration.StartProcesses.StartSubHttpGet {
|
||||||
proc.startup.subscriber(proc, HttpGet, nil)
|
proc.startup.subscriber(proc, HttpGet, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubTailFile {
|
if proc.configuration.StartProcesses.StartSubTailFile {
|
||||||
proc.startup.subscriber(proc, TailFile, nil)
|
proc.startup.subscriber(proc, TailFile, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubCliCommandCont {
|
if proc.configuration.StartProcesses.StartSubCliCommandCont {
|
||||||
proc.startup.subscriber(proc, CliCommandCont, nil)
|
proc.startup.subscriber(proc, CliCommandCont, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -388,6 +388,8 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p
|
||||||
go proc.spawnWorker()
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// publisher will start a publisher process. It takes the initial process, request method,
|
||||||
|
// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil.
|
||||||
func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) {
|
||||||
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
|
er := fmt.Errorf("starting %v publisher: %#v", m, p.node)
|
||||||
p.errorKernel.logDebug(er)
|
p.errorKernel.logDebug(er)
|
||||||
|
|
|
@ -359,7 +359,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
||||||
proc.errorKernel.errSend(proc, message, er, logError)
|
proc.errorKernel.errSend(proc, message, er, logError)
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectFileNaming will figure out the correct naming of the file
|
// selectFileNaming will figure out the correct naming of the file
|
||||||
|
|
|
@ -92,7 +92,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message)
|
return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message)
|
||||||
}
|
}
|
||||||
|
@ -267,7 +267,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) {
|
||||||
cancel()
|
cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName)
|
replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName)
|
||||||
|
|
||||||
|
@ -566,7 +566,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
resendRetries = 0
|
resendRetries = 0
|
||||||
|
|
||||||
|
@ -636,7 +636,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
resendRetries++
|
resendRetries++
|
||||||
|
|
||||||
|
@ -699,7 +699,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open a tmp folder for where to write the received chunks
|
// Open a tmp folder for where to write the received chunks
|
||||||
|
@ -801,7 +801,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
case copyResendLast:
|
case copyResendLast:
|
||||||
// The csa already contains copyStatus copyResendLast when reached here,
|
// The csa already contains copyStatus copyResendLast when reached here,
|
||||||
|
@ -834,7 +834,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
case copySrcDone:
|
case copySrcDone:
|
||||||
err := func() error {
|
err := func() error {
|
||||||
|
@ -988,7 +988,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc
|
||||||
return er
|
return er
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
}
|
}
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
|
|
|
@ -334,7 +334,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
|
er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
proc.errorKernel.logDebug(er)
|
||||||
|
@ -381,7 +381,7 @@ func pushKeys(proc process, message Message, nodes []Node) error {
|
||||||
proc.errorKernel.errSend(proc, message, er, logWarning)
|
proc.errorKernel.errSend(proc, message, er, logWarning)
|
||||||
}
|
}
|
||||||
|
|
||||||
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
proc.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
|
er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode)
|
||||||
proc.errorKernel.logDebug(er)
|
proc.errorKernel.logDebug(er)
|
||||||
|
|
|
@ -76,8 +76,8 @@ func newServerForTesting(addressAndPort string, testFolder string) (*server, *Co
|
||||||
conf.SocketFolder = testFolder
|
conf.SocketFolder = testFolder
|
||||||
conf.SubscribersDataFolder = testFolder
|
conf.SubscribersDataFolder = testFolder
|
||||||
conf.DatabaseFolder = testFolder
|
conf.DatabaseFolder = testFolder
|
||||||
conf.IsCentralErrorLogger = true
|
conf.StartProcesses.IsCentralErrorLogger = true
|
||||||
conf.IsCentralAuth = true
|
conf.StartProcesses.IsCentralAuth = true
|
||||||
conf.EnableDebug = false
|
conf.EnableDebug = false
|
||||||
conf.LogLevel = "none"
|
conf.LogLevel = "none"
|
||||||
|
|
||||||
|
@ -289,7 +289,7 @@ func TestRequest(t *testing.T) {
|
||||||
t.Fatalf("newSubjectAndMessage failed: %v\n", err)
|
t.Fatalf("newSubjectAndMessage failed: %v\n", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
tstSrv.samToSendCh <- []subjectAndMessage{sam}
|
tstSrv.newMessagesCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
case viaSocket:
|
case viaSocket:
|
||||||
msgs := []Message{tt.message}
|
msgs := []Message{tt.message}
|
||||||
|
|
|
@ -48,7 +48,7 @@ type server struct {
|
||||||
//
|
//
|
||||||
// In general the ringbuffer will read this
|
// In general the ringbuffer will read this
|
||||||
// channel, unfold each slice, and put single messages on the buffer.
|
// channel, unfold each slice, and put single messages on the buffer.
|
||||||
samToSendCh chan []subjectAndMessage
|
newMessagesCh chan []subjectAndMessage
|
||||||
// directSAMSCh
|
// directSAMSCh
|
||||||
samSendLocalCh chan []subjectAndMessage
|
samSendLocalCh chan []subjectAndMessage
|
||||||
// errorKernel is doing all the error handling like what to do if
|
// errorKernel is doing all the error handling like what to do if
|
||||||
|
@ -217,7 +217,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
||||||
nodeName: configuration.NodeName,
|
nodeName: configuration.NodeName,
|
||||||
natsConn: conn,
|
natsConn: conn,
|
||||||
ctrlSocket: ctrlSocket,
|
ctrlSocket: ctrlSocket,
|
||||||
samToSendCh: make(chan []subjectAndMessage),
|
newMessagesCh: make(chan []subjectAndMessage),
|
||||||
samSendLocalCh: make(chan []subjectAndMessage),
|
samSendLocalCh: make(chan []subjectAndMessage),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
version: version,
|
version: version,
|
||||||
|
@ -292,7 +292,7 @@ func (s *server) Start() {
|
||||||
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
|
s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)})
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
err := s.errorKernel.start(s.samToSendCh)
|
err := s.errorKernel.start(s.newMessagesCh)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("%v\n", err)
|
log.Printf("%v\n", err)
|
||||||
}
|
}
|
||||||
|
@ -485,7 +485,7 @@ func (s *server) routeMessagesToProcess() {
|
||||||
methodsAvailable := method.GetMethodsAvailable()
|
methodsAvailable := method.GetMethodsAvailable()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
for samSlice := range s.samToSendCh {
|
for samSlice := range s.newMessagesCh {
|
||||||
for _, sam := range samSlice {
|
for _, sam := range samSlice {
|
||||||
|
|
||||||
go func(sam subjectAndMessage) {
|
go func(sam subjectAndMessage) {
|
||||||
|
|
Loading…
Reference in a new issue