diff --git a/configuration_flags.go b/configuration_flags.go index 5334db1..4898a30 100644 --- a/configuration_flags.go +++ b/configuration_flags.go @@ -88,8 +88,7 @@ type Configuration struct { EnableSignatureCheck bool `comment:"EnableSignatureCheck to enable signature checking"` // EnableAclCheck to enable ACL checking EnableAclCheck bool `comment:"EnableAclCheck to enable ACL checking"` - // IsCentralAuth, enable to make this instance take the role as the central auth server - IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"` + // EnableDebug will also enable printing all the messages received in the errorKernel to STDERR. EnableDebug bool `comment:"EnableDebug will also enable printing all the messages received in the errorKernel to STDERR."` // LogLevel @@ -102,6 +101,10 @@ type Configuration struct { // it have not received any messages for the given amount of time. KeepPublishersAliveFor int `comment:"KeepPublishersAliveFor number of seconds Timer that will be used for when to remove the sub process publisher. The timer is reset each time a message is published with the process, so the sub process publisher will not be removed until it have not received any messages for the given amount of time."` + StartProcesses StartProcesses +} + +type StartProcesses struct { // StartPubHello, sets the interval in seconds for how often we send hello messages to central server StartPubHello int `comment:"StartPubHello, sets the interval in seconds for how often we send hello messages to central server"` // Enable the updates of public keys @@ -132,6 +135,9 @@ type Configuration struct { StartSubTailFile bool `comment:"Start subscriber for tailing log files"` // Start subscriber for continously delivery of output from cli commands. StartSubCliCommandCont bool `comment:"Start subscriber for continously delivery of output from cli commands."` + + // IsCentralAuth, enable to make this instance take the role as the central auth server + IsCentralAuth bool `comment:"IsCentralAuth, enable to make this instance take the role as the central auth server"` } // NewConfiguration will return a *Configuration. @@ -140,7 +146,7 @@ func NewConfiguration() *Configuration { err := godotenv.Load() if err != nil { - log.Printf("Error loading .env file: %v\n", err) + log.Printf("info: no .env file found, will only use env vars or flags: %v\n", err) } //flag.StringVar(&c.ConfigFolder, "configFolder", fc.ConfigFolder, "Defaults to ./usr/local/ctrl/etc/. *NB* This flag is not used, if your config file are located somwhere else than default set the location in an env variable named CONFIGFOLDER") @@ -177,7 +183,7 @@ func NewConfiguration() *Configuration { flag.BoolVar(&c.EnableSocket, "enableSocket", CheckEnv("ENABLE_SOCKET", c.EnableSocket).(bool), "true/false, for enabling the creation of ctrl.sock file") flag.BoolVar(&c.EnableSignatureCheck, "enableSignatureCheck", CheckEnv("ENABLE_SIGNATURE_CHECK", c.EnableSignatureCheck).(bool), "true/false *TESTING* enable signature checking.") flag.BoolVar(&c.EnableAclCheck, "enableAclCheck", CheckEnv("ENABLE_ACL_CHECK", c.EnableAclCheck).(bool), "true/false *TESTING* enable Acl checking.") - flag.BoolVar(&c.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server") + flag.BoolVar(&c.StartProcesses.IsCentralAuth, "isCentralAuth", CheckEnv("IS_CENTRAL_AUTH", c.StartProcesses.IsCentralAuth).(bool), "true/false, *TESTING* is this the central auth server") flag.BoolVar(&c.EnableDebug, "enableDebug", CheckEnv("ENABLE_DEBUG", c.EnableDebug).(bool), "true/false, will enable debug logging so all messages sent to the errorKernel will also be printed to STDERR") flag.StringVar(&c.LogLevel, "logLevel", CheckEnv("LOG_LEVEL", c.LogLevel).(string), "error/info/warning/debug/none") flag.BoolVar(&c.LogConsoleTimestamps, "LogConsoleTimestamps", CheckEnv("LOG_CONSOLE_TIMESTAMPS", c.LogConsoleTimestamps).(bool), "true/false for enabling or disabling timestamps when printing errors and information to stderr") @@ -185,23 +191,23 @@ func NewConfiguration() *Configuration { // Start of Request publishers/subscribers - flag.IntVar(&c.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds") + flag.IntVar(&c.StartProcesses.StartPubHello, "startPubHello", CheckEnv("START_PUB_HELLO", c.StartProcesses.StartPubHello).(int), "Make the current node send hello messages to central at given interval in seconds") - flag.BoolVar(&c.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.EnableKeyUpdates).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.EnableKeyUpdates, "EnableKeyUpdates", CheckEnv("ENABLE_KEY_UPDATES", c.StartProcesses.EnableKeyUpdates).(bool), "true/false") - flag.BoolVar(&c.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.EnableAclUpdates).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.EnableAclUpdates, "EnableAclUpdates", CheckEnv("ENABLE_ACL_UPDATES", c.StartProcesses.EnableAclUpdates).(bool), "true/false") - flag.BoolVar(&c.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.IsCentralErrorLogger).(bool), "true/false") - flag.BoolVar(&c.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartSubHello).(bool), "true/false") - flag.BoolVar(&c.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartSubFileAppend).(bool), "true/false") - flag.BoolVar(&c.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartSubFile).(bool), "true/false") - flag.BoolVar(&c.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartSubCopySrc).(bool), "true/false") - flag.BoolVar(&c.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartSubCopyDst).(bool), "true/false") - flag.BoolVar(&c.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartSubCliCommand).(bool), "true/false") - flag.BoolVar(&c.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartSubConsole).(bool), "true/false") - flag.BoolVar(&c.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartSubHttpGet).(bool), "true/false") - flag.BoolVar(&c.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartSubTailFile).(bool), "true/false") - flag.BoolVar(&c.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartSubCliCommandCont).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.IsCentralErrorLogger, "isCentralErrorLogger", CheckEnv("IS_CENTRAL_ERROR_LOGGER", c.StartProcesses.IsCentralErrorLogger).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubHello, "startSubHello", CheckEnv("START_SUB_HELLO", c.StartProcesses.StartSubHello).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubFileAppend, "startSubFileAppend", CheckEnv("START_SUB_FILE_APPEND", c.StartProcesses.StartSubFileAppend).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubFile, "startSubFile", CheckEnv("START_SUB_FILE", c.StartProcesses.StartSubFile).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubCopySrc, "startSubCopySrc", CheckEnv("START_SUB_COPY_SRC", c.StartProcesses.StartSubCopySrc).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubCopyDst, "startSubCopyDst", CheckEnv("START_SUB_COPY_DST", c.StartProcesses.StartSubCopyDst).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubCliCommand, "startSubCliCommand", CheckEnv("START_SUB_CLI_COMMAND", c.StartProcesses.StartSubCliCommand).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubConsole, "startSubConsole", CheckEnv("START_SUB_CONSOLE", c.StartProcesses.StartSubConsole).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubHttpGet, "startSubHttpGet", CheckEnv("START_SUB_HTTP_GET", c.StartProcesses.StartSubHttpGet).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubTailFile, "startSubTailFile", CheckEnv("START_SUB_TAIL_FILE", c.StartProcesses.StartSubTailFile).(bool), "true/false") + flag.BoolVar(&c.StartProcesses.StartSubCliCommandCont, "startSubCliCommandCont", CheckEnv("START_SUB_CLI_COMMAND_CONT", c.StartProcesses.StartSubCliCommandCont).(bool), "true/false") // Check that mandatory flag values have been set. switch { @@ -254,26 +260,28 @@ func newConfigurationDefaults() Configuration { EnableSocket: true, EnableSignatureCheck: false, EnableAclCheck: false, - IsCentralAuth: false, EnableDebug: false, LogLevel: "debug", LogConsoleTimestamps: false, KeepPublishersAliveFor: 10, - StartPubHello: 30, - EnableKeyUpdates: false, - EnableAclUpdates: false, - IsCentralErrorLogger: false, - StartSubHello: true, - StartSubFileAppend: true, - StartSubFile: true, - StartSubCopySrc: true, - StartSubCopyDst: true, - StartSubCliCommand: true, - StartSubConsole: true, - StartSubHttpGet: true, - StartSubTailFile: true, - StartSubCliCommandCont: true, + StartProcesses: StartProcesses{ + StartPubHello: 30, + EnableKeyUpdates: false, + EnableAclUpdates: false, + IsCentralErrorLogger: false, + StartSubHello: true, + StartSubFileAppend: true, + StartSubFile: true, + StartSubCopySrc: true, + StartSubCopyDst: true, + StartSubCliCommand: true, + StartSubConsole: true, + StartSubHttpGet: true, + StartSubTailFile: true, + StartSubCliCommandCont: true, + IsCentralAuth: false, + }, } return c } diff --git a/doc/src/core_messaging_message_fields.md b/doc/src/core_messaging_message_fields.md index 5c36fe9..0a2d4f8 100644 --- a/doc/src/core_messaging_message_fields.md +++ b/doc/src/core_messaging_message_fields.md @@ -24,6 +24,8 @@ method : cliCommand What request method type to use, like cliCommand, httpGet.. +All [methods](./core_request_methods.md). + ```yaml methodArgs : - "bash" @@ -96,6 +98,7 @@ methodTimeout : 10 ``` Timeout for how long a method should be allowed to run before it is timed out. +If `methodTimeout : -1` the method will not time out. ```yaml replyMethodTimeout : 10 diff --git a/message_readers.go b/message_readers.go index dad8f9c..4e80b43 100644 --- a/message_readers.go +++ b/message_readers.go @@ -211,7 +211,7 @@ func (s *server) readSocket() { } // Send the SAM struct to be picked up by the ring buffer. - s.samToSendCh <- sams + s.newMessagesCh <- sams s.auditLogCh <- sams }(conn) @@ -293,7 +293,7 @@ func (s *server) readFolder() { s.errorKernel.logDebug(er) // Send the SAM struct to be picked up by the ring buffer. - s.samToSendCh <- sams + s.newMessagesCh <- sams s.auditLogCh <- sams // Delete the file. @@ -386,7 +386,7 @@ func (s *server) readTCPListener() { } // Send the SAM struct to be picked up by the ring buffer. - s.samToSendCh <- sams + s.newMessagesCh <- sams s.auditLogCh <- sams }(conn) @@ -431,7 +431,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) } // Send the SAM struct to be picked up by the ring buffer. - s.samToSendCh <- sams + s.newMessagesCh <- sams s.auditLogCh <- sams } diff --git a/process.go b/process.go index c67e51a..058ddb5 100644 --- a/process.go +++ b/process.go @@ -88,7 +88,7 @@ type process struct { // copy of the configuration from server configuration *Configuration // The new messages channel copied from *Server - toRingbufferCh chan<- []subjectAndMessage + newMessagesCh chan<- []subjectAndMessage // The structure who holds all processes information processes *processes // nats connection @@ -143,7 +143,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin processID: pid, processKind: processKind, methodsAvailable: method.GetMethodsAvailable(), - toRingbufferCh: server.samToSendCh, + newMessagesCh: server.newMessagesCh, configuration: server.configuration, processes: server.processes, natsConn: server.natsConn, diff --git a/processes.go b/processes.go index ffd86eb..49745b7 100644 --- a/processes.go +++ b/processes.go @@ -97,23 +97,23 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, OpProcessStop, nil) proc.startup.subscriber(proc, Test, nil) - if proc.configuration.StartSubFileAppend { + if proc.configuration.StartProcesses.StartSubFileAppend { proc.startup.subscriber(proc, FileAppend, nil) } - if proc.configuration.StartSubFile { + if proc.configuration.StartProcesses.StartSubFile { proc.startup.subscriber(proc, File, nil) } - if proc.configuration.StartSubCopySrc { + if proc.configuration.StartProcesses.StartSubCopySrc { proc.startup.subscriber(proc, CopySrc, nil) } - if proc.configuration.StartSubCopyDst { + if proc.configuration.StartProcesses.StartSubCopyDst { proc.startup.subscriber(proc, CopyDst, nil) } - if proc.configuration.StartSubHello { + if proc.configuration.StartProcesses.StartSubHello { // subREQHello is the handler that is triggered when we are receiving a hello // message. To keep the state of all the hello's received from nodes we need // to also start a procFunc that will live as a go routine tied to this process, @@ -152,21 +152,21 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, Hello, pf) } - if proc.configuration.IsCentralErrorLogger { + if proc.configuration.StartProcesses.IsCentralErrorLogger { proc.startup.subscriber(proc, ErrorLog, nil) } - if proc.configuration.StartSubCliCommand { + if proc.configuration.StartProcesses.StartSubCliCommand { proc.startup.subscriber(proc, CliCommand, nil) } - if proc.configuration.StartSubConsole { + if proc.configuration.StartProcesses.StartSubConsole { proc.startup.subscriber(proc, Console, nil) } - if proc.configuration.StartPubHello != 0 { + if proc.configuration.StartProcesses.StartPubHello != 0 { pf := func(ctx context.Context, procFuncCh chan Message) error { - ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartPubHello)) + ticker := time.NewTicker(time.Second * time.Duration(p.configuration.StartProcesses.StartPubHello)) defer ticker.Stop() for { @@ -191,7 +191,7 @@ func (p *processes) Start(proc process) { er := fmt.Errorf("error: ProcessesStart: %v", err) p.errorKernel.errSend(proc, m, er, logError) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} select { case <-ticker.C: @@ -206,7 +206,7 @@ func (p *processes) Start(proc process) { proc.startup.publisher(proc, Hello, pf) } - if proc.configuration.EnableKeyUpdates { + if proc.configuration.StartProcesses.EnableKeyUpdates { // Define the startup of a publisher that will send KeysRequestUpdate // to central server and ask for publics keys, and to get them deliver back with a request // of type KeysDeliverUpdate. @@ -241,7 +241,7 @@ func (p *processes) Start(proc process) { // In theory the system should drop the message before it reaches here. p.errorKernel.errSend(proc, m, err, logError) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} select { case <-ticker.C: @@ -257,7 +257,7 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, KeysDeliverUpdate, nil) } - if proc.configuration.EnableAclUpdates { + if proc.configuration.StartProcesses.EnableAclUpdates { pf := func(ctx context.Context, procFuncCh chan Message) error { ticker := time.NewTicker(time.Second * time.Duration(p.configuration.AclUpdateInterval)) defer ticker.Stop() @@ -290,7 +290,7 @@ func (p *processes) Start(proc process) { p.errorKernel.errSend(proc, m, err, logError) log.Printf("error: ProcessesStart: %v\n", err) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} select { case <-ticker.C: @@ -306,7 +306,7 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, AclDeliverUpdate, nil) } - if proc.configuration.IsCentralAuth { + if proc.configuration.StartProcesses.IsCentralAuth { proc.startup.subscriber(proc, KeysRequestUpdate, nil) proc.startup.subscriber(proc, KeysAllow, nil) proc.startup.subscriber(proc, KeysDelete, nil) @@ -324,15 +324,15 @@ func (p *processes) Start(proc process) { proc.startup.subscriber(proc, AclImport, nil) } - if proc.configuration.StartSubHttpGet { + if proc.configuration.StartProcesses.StartSubHttpGet { proc.startup.subscriber(proc, HttpGet, nil) } - if proc.configuration.StartSubTailFile { + if proc.configuration.StartProcesses.StartSubTailFile { proc.startup.subscriber(proc, TailFile, nil) } - if proc.configuration.StartSubCliCommandCont { + if proc.configuration.StartProcesses.StartSubCliCommandCont { proc.startup.subscriber(proc, CliCommandCont, nil) } @@ -388,6 +388,8 @@ func (s *startup) subscriber(p process, m Method, pf func(ctx context.Context, p go proc.spawnWorker() } +// publisher will start a publisher process. It takes the initial process, request method, +// and a procFunc as it's input arguments. If a procFunc is not needed, use the value nil. func (s *startup) publisher(p process, m Method, pf func(ctx context.Context, procFuncCh chan Message) error) { er := fmt.Errorf("starting %v publisher: %#v", m, p.node) p.errorKernel.logDebug(er) diff --git a/requests.go b/requests.go index ff690ea..505236c 100644 --- a/requests.go +++ b/requests.go @@ -359,7 +359,7 @@ func newReplyMessage(proc process, message Message, outData []byte) { proc.errorKernel.errSend(proc, message, er, logError) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} } // selectFileNaming will figure out the correct naming of the file diff --git a/requests_copy.go b/requests_copy.go index 4c23b28..8733b80 100644 --- a/requests_copy.go +++ b/requests_copy.go @@ -92,7 +92,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { proc.errorKernel.errSend(proc, message, er, logWarning) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} return nil, fmt.Errorf("info: the copy message was forwarded to %v message, %v", message.ToNode, message) } @@ -267,7 +267,7 @@ func methodCopySrc(proc process, message Message, node string) ([]byte, error) { cancel() } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} replyData := fmt.Sprintf("info: succesfully initiated copy source process: procName=%v, srcNode=%v, srcPath=%v, dstNode=%v, dstPath=%v, starting sub process=%v for the actual copying", copySrcSubProc.processName, node, SrcFilePath, DstNode, DstFilePath, subProcessName) @@ -566,7 +566,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel return er } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} resendRetries = 0 @@ -636,7 +636,7 @@ func copySrcSubProcFunc(proc process, cia copyInitialData, cancel context.Cancel return er } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} resendRetries++ @@ -699,7 +699,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} } // Open a tmp folder for where to write the received chunks @@ -801,7 +801,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} case copyResendLast: // The csa already contains copyStatus copyResendLast when reached here, @@ -834,7 +834,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} case copySrcDone: err := func() error { @@ -988,7 +988,7 @@ func copyDstSubProcFunc(proc process, cia copyInitialData, message Message, canc return er } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} } cancel() diff --git a/requests_keys.go b/requests_keys.go index 105a8c5..5062976 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -334,7 +334,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.errorKernel.errSend(proc, message, er, logWarning) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} er = fmt.Errorf("----> methodKeysAllow: SENDING KEYS TO NODE=%v", message.FromNode) proc.errorKernel.logDebug(er) @@ -381,7 +381,7 @@ func pushKeys(proc process, message Message, nodes []Node) error { proc.errorKernel.errSend(proc, message, er, logWarning) } - proc.toRingbufferCh <- []subjectAndMessage{sam} + proc.newMessagesCh <- []subjectAndMessage{sam} er = fmt.Errorf("----> methodKeysAllow: sending keys update to node=%v", message.FromNode) proc.errorKernel.logDebug(er) diff --git a/requests_test.go b/requests_test.go index a351bc3..c6c7e9e 100644 --- a/requests_test.go +++ b/requests_test.go @@ -76,8 +76,8 @@ func newServerForTesting(addressAndPort string, testFolder string) (*server, *Co conf.SocketFolder = testFolder conf.SubscribersDataFolder = testFolder conf.DatabaseFolder = testFolder - conf.IsCentralErrorLogger = true - conf.IsCentralAuth = true + conf.StartProcesses.IsCentralErrorLogger = true + conf.StartProcesses.IsCentralAuth = true conf.EnableDebug = false conf.LogLevel = "none" @@ -289,7 +289,7 @@ func TestRequest(t *testing.T) { t.Fatalf("newSubjectAndMessage failed: %v\n", err) } - tstSrv.samToSendCh <- []subjectAndMessage{sam} + tstSrv.newMessagesCh <- []subjectAndMessage{sam} case viaSocket: msgs := []Message{tt.message} diff --git a/server.go b/server.go index 25749f4..fe5d294 100644 --- a/server.go +++ b/server.go @@ -48,7 +48,7 @@ type server struct { // // In general the ringbuffer will read this // channel, unfold each slice, and put single messages on the buffer. - samToSendCh chan []subjectAndMessage + newMessagesCh chan []subjectAndMessage // directSAMSCh samSendLocalCh chan []subjectAndMessage // errorKernel is doing all the error handling like what to do if @@ -217,7 +217,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { nodeName: configuration.NodeName, natsConn: conn, ctrlSocket: ctrlSocket, - samToSendCh: make(chan []subjectAndMessage), + newMessagesCh: make(chan []subjectAndMessage), samSendLocalCh: make(chan []subjectAndMessage), metrics: metrics, version: version, @@ -292,7 +292,7 @@ func (s *server) Start() { s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)}) go func() { - err := s.errorKernel.start(s.samToSendCh) + err := s.errorKernel.start(s.newMessagesCh) if err != nil { log.Printf("%v\n", err) } @@ -485,7 +485,7 @@ func (s *server) routeMessagesToProcess() { methodsAvailable := method.GetMethodsAvailable() go func() { - for samSlice := range s.samToSendCh { + for samSlice := range s.newMessagesCh { for _, sam := range samSlice { go func(sam subjectAndMessage) {