mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-05 06:46:48 +00:00
removed startup struct
This commit is contained in:
parent
8065eb248b
commit
693a6b819e
3 changed files with 55 additions and 59 deletions
|
@ -94,12 +94,9 @@ type Configuration struct {
|
|||
// it have not received any messages for the given amount of time.
|
||||
KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."`
|
||||
|
||||
StartProcesses StartProcesses
|
||||
// Comma separated list of additional streams to consume from.
|
||||
JetstreamsConsume string
|
||||
}
|
||||
|
||||
type StartProcesses struct {
|
||||
StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"`
|
||||
EnableKeyUpdates bool `comment:"Enable the updates of public keys"`
|
||||
|
||||
|
@ -164,7 +161,7 @@ func NewConfiguration() *Configuration {
|
|||
flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file")
|
||||
flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.")
|
||||
flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.")
|
||||
flag.BoolVar(&c.StartProcesses.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.StartProcesses.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server")
|
||||
flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server")
|
||||
flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none")
|
||||
flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr")
|
||||
flag.IntVar(&c.KeepPublishersAliveFor, "keepPublishersAliveFor", CheckEnv("KEEP_PUBLISHERS_ALIVE_FOR", c.KeepPublishersAliveFor).(int), "The amount of time we allow a publisher to stay alive without receiving any messages to publish")
|
||||
|
@ -173,26 +170,26 @@ func NewConfiguration() *Configuration {
|
|||
|
||||
// Start of Request publishers/subscribers
|
||||
|
||||
flag.IntVar(&c.StartProcesses.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartProcesses.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds")
|
||||
flag.IntVar(&c.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds")
|
||||
|
||||
flag.BoolVar(&c.StartProcesses.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.StartProcesses.EnableKeyUpdates).(bool), "true/false")
|
||||
flag.BoolVar(&c.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.EnableKeyUpdates).(bool), "true/false")
|
||||
|
||||
flag.BoolVar(&c.StartProcesses.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.StartProcesses.EnableAclUpdates).(bool), "true/false")
|
||||
flag.BoolVar(&c.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.EnableAclUpdates).(bool), "true/false")
|
||||
|
||||
flag.BoolVar(&c.StartProcesses.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.StartProcesses.IsCentralErrorLogger).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartProcesses.StartSubHello).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartProcesses.StartSubFileAppend).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartProcesses.StartSubFile).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartProcesses.StartSubCopySrc).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartProcesses.StartSubCopyDst).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartProcesses.StartSubCliCommand).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartProcesses.StartSubConsole).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartProcesses.StartSubHttpGet).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartProcesses.StartSubTailFile).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartProcesses.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartProcesses.StartSubCliCommandCont).(bool), "true/false")
|
||||
flag.BoolVar(&c.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.IsCentralErrorLogger).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartSubHello).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartSubFileAppend).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartSubFile).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartSubCopySrc).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartSubCopyDst).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartSubCliCommand).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartSubConsole).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartSubHttpGet).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartSubTailFile).(bool), "true/false")
|
||||
flag.BoolVar(&c.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartSubCliCommandCont).(bool), "true/false")
|
||||
|
||||
flag.BoolVar(&c.StartProcesses.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartProcesses.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher")
|
||||
flag.BoolVar(&c.StartProcesses.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartProcesses.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer")
|
||||
flag.BoolVar(&c.StartJetstreamPublisher, "startJetstreamPublisher", CheckEnv("START_JETSTREAM_PUBLISHER", c.StartJetstreamPublisher).(bool), "Start the nats jetstream publisher")
|
||||
flag.BoolVar(&c.StartJetstreamConsumer, "StartJetstreamConsumer", CheckEnv("START_JETSTREAM_CONSUMER", c.StartJetstreamConsumer).(bool), "Start the nats jetstream consumer")
|
||||
|
||||
// Check that mandatory flag values have been set.
|
||||
switch {
|
||||
|
@ -249,25 +246,23 @@ func newConfigurationDefaults() Configuration {
|
|||
KeepPublishersAliveFor: 10,
|
||||
JetstreamsConsume: "",
|
||||
|
||||
StartProcesses: StartProcesses{
|
||||
StartPubHello: 30,
|
||||
EnableKeyUpdates: false,
|
||||
EnableAclUpdates: false,
|
||||
IsCentralErrorLogger: false,
|
||||
StartSubHello: true,
|
||||
StartSubFileAppend: true,
|
||||
StartSubFile: true,
|
||||
StartSubCopySrc: true,
|
||||
StartSubCopyDst: true,
|
||||
StartSubCliCommand: true,
|
||||
StartSubConsole: true,
|
||||
StartSubHttpGet: true,
|
||||
StartSubTailFile: true,
|
||||
StartSubCliCommandCont: true,
|
||||
IsCentralAuth: false,
|
||||
StartJetstreamPublisher: true,
|
||||
StartJetstreamConsumer: true,
|
||||
},
|
||||
StartPubHello: 30,
|
||||
EnableKeyUpdates: false,
|
||||
EnableAclUpdates: false,
|
||||
IsCentralErrorLogger: false,
|
||||
StartSubHello: true,
|
||||
StartSubFileAppend: true,
|
||||
StartSubFile: true,
|
||||
StartSubCopySrc: true,
|
||||
StartSubCopyDst: true,
|
||||
StartSubCliCommand: true,
|
||||
StartSubConsole: true,
|
||||
StartSubHttpGet: true,
|
||||
StartSubTailFile: true,
|
||||
StartSubCliCommandCont: true,
|
||||
IsCentralAuth: false,
|
||||
StartJetstreamPublisher: true,
|
||||
StartJetstreamConsumer: true,
|
||||
}
|
||||
return c
|
||||
}
|
||||
|
|
37
processes.go
37
processes.go
|
@ -99,23 +99,23 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subscriber(proc, OpProcessStop, nil)
|
||||
proc.startup.subscriber(proc, Test, nil)
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubFileAppend {
|
||||
if proc.configuration.StartSubFileAppend {
|
||||
proc.startup.subscriber(proc, FileAppend, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubFile {
|
||||
if proc.configuration.StartSubFile {
|
||||
proc.startup.subscriber(proc, File, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCopySrc {
|
||||
if proc.configuration.StartSubCopySrc {
|
||||
proc.startup.subscriber(proc, CopySrc, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCopyDst {
|
||||
if proc.configuration.StartSubCopyDst {
|
||||
proc.startup.subscriber(proc, CopyDst, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubHello {
|
||||
if proc.configuration.StartSubHello {
|
||||
// subREQHello is the handler that is triggered when we are receiving a hello
|
||||
// message. To keep the state of all the hello's received from nodes we need
|
||||
// to also start a procFunc that will live as a go routine tied to this process,
|
||||
|
@ -154,21 +154,22 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subscriber(proc, Hello, pf)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.IsCentralErrorLogger {
|
||||
fmt.Printf("--------------------------------IsCentralErrorLogger = %v------------------------------\n", proc.configuration.IsCentralErrorLogger)
|
||||
if proc.configuration.IsCentralErrorLogger {
|
||||
proc.startup.subscriber(proc, ErrorLog, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCliCommand {
|
||||
if proc.configuration.StartSubCliCommand {
|
||||
proc.startup.subscriber(proc, CliCommand, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubConsole {
|
||||
if proc.configuration.StartSubConsole {
|
||||
proc.startup.subscriber(proc, Console, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartPubHello != 0 {
|
||||
if proc.configuration.StartPubHello != 0 {
|
||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello))
|
||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubHello))
|
||||
defer ticker.Stop()
|
||||
for {
|
||||
|
||||
|
@ -208,7 +209,7 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.publisher(proc, Hello, pf)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.EnableKeyUpdates {
|
||||
if proc.configuration.EnableKeyUpdates {
|
||||
// Define the startup of a publisher that will send KeysRequestUpdate
|
||||
// to central server and ask for publics keys, and to get them deliver back with a request
|
||||
// of type KeysDeliverUpdate.
|
||||
|
@ -259,7 +260,7 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subscriber(proc, KeysDeliverUpdate, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.EnableAclUpdates {
|
||||
if proc.configuration.EnableAclUpdates {
|
||||
pf := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval))
|
||||
defer ticker.Stop()
|
||||
|
@ -308,7 +309,7 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subscriber(proc, AclDeliverUpdate, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.IsCentralAuth {
|
||||
if proc.configuration.IsCentralAuth {
|
||||
proc.startup.subscriber(proc, KeysRequestUpdate, nil)
|
||||
proc.startup.subscriber(proc, KeysAllow, nil)
|
||||
proc.startup.subscriber(proc, KeysDelete, nil)
|
||||
|
@ -326,15 +327,15 @@ func (p *processes) Start(proc process) {
|
|||
proc.startup.subscriber(proc, AclImport, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubHttpGet {
|
||||
if proc.configuration.StartSubHttpGet {
|
||||
proc.startup.subscriber(proc, HttpGet, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubTailFile {
|
||||
if proc.configuration.StartSubTailFile {
|
||||
proc.startup.subscriber(proc, TailFile, nil)
|
||||
}
|
||||
|
||||
if proc.configuration.StartProcesses.StartSubCliCommandCont {
|
||||
if proc.configuration.StartSubCliCommandCont {
|
||||
proc.startup.subscriber(proc, CliCommandCont, nil)
|
||||
}
|
||||
|
||||
|
@ -343,7 +344,7 @@ func (p *processes) Start(proc process) {
|
|||
// --------------------------------------------------
|
||||
// ProcFunc for Jetstream publishers.
|
||||
// --------------------------------------------------
|
||||
if proc.configuration.StartProcesses.StartJetstreamPublisher {
|
||||
if proc.configuration.StartJetstreamPublisher {
|
||||
pfJetstreamPublishers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
js, err := jetstream.New(proc.natsConn)
|
||||
if err != nil {
|
||||
|
@ -393,7 +394,7 @@ func (p *processes) Start(proc process) {
|
|||
// After a jetstream message is picked up from the stream, the steward message
|
||||
// will be extracted from the data field, and the ctrl message will be put
|
||||
// on the local delivery channel, and handled as a normal ctrl message.
|
||||
if proc.configuration.StartProcesses.StartJetstreamConsumer {
|
||||
if proc.configuration.StartJetstreamConsumer {
|
||||
pfJetstreamConsumers := func(ctx context.Context, procFuncCh chan Message) error {
|
||||
js, err := jetstream.New(proc.natsConn)
|
||||
if err != nil {
|
||||
|
|
|
@ -76,8 +76,8 @@ func newServerForTesting(addressAndPort string, testFolder string) (*server, *Co
|
|||
conf.SocketFolder = testFolder
|
||||
conf.SubscribersDataFolder = testFolder
|
||||
conf.DatabaseFolder = testFolder
|
||||
conf.StartProcesses.IsCentralErrorLogger = true
|
||||
conf.StartProcesses.IsCentralAuth = true
|
||||
conf.IsCentralErrorLogger = true
|
||||
conf.IsCentralAuth = true
|
||||
conf.LogLevel = "none"
|
||||
|
||||
ctrlServer, err := NewServer(&conf, "test")
|
||||
|
|
Loading…
Add table
Reference in a new issue