diff --git a/doc/concept/auth/auth.md b/doc/concept/auth/auth.md index 5867d9c..a91f97e 100644 --- a/doc/concept/auth/auth.md +++ b/doc/concept/auth/auth.md @@ -40,6 +40,8 @@ NB: Nodes that don't have hello messages enabled and are not present in the hell If a node is registered in the auth db but not present in the network we should throw a log message to the errorKernel so operators would be aware of such nodes. +DECIDE: Hello messages should contain the public key ? + ### Public Keys #### Central Store diff --git a/message_readers.go b/message_readers.go index f22481d..3d13e65 100644 --- a/message_readers.go +++ b/message_readers.go @@ -211,7 +211,7 @@ func (s *server) readSocket() { } // Send the SAM struct to be picked up by the ring buffer. - s.ringBufferBulkInCh <- sams + s.toRingBufferCh <- sams }(conn) } @@ -276,7 +276,7 @@ func (s *server) readTCPListener() { } // Send the SAM struct to be picked up by the ring buffer. - s.ringBufferBulkInCh <- sam + s.toRingBufferCh <- sam }(conn) } @@ -320,7 +320,7 @@ func (s *server) readHTTPlistenerHandler(w http.ResponseWriter, r *http.Request) } // Send the SAM struct to be picked up by the ring buffer. - s.ringBufferBulkInCh <- sam + s.toRingBufferCh <- sam } diff --git a/process.go b/process.go index 8528f28..a18c15f 100644 --- a/process.go +++ b/process.go @@ -32,6 +32,9 @@ const ( // process holds all the logic to handle a message type and it's // method, subscription/publishin messages for a subject, and more. type process struct { + // server + server *server + // messageID messageID int // the subject used for the specific process. One process // can contain only one sender on a message bus, hence @@ -103,29 +106,30 @@ type process struct { // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, processKind processKind, procFunc func() error, signatures *signatures) process { +func newProcess(ctx context.Context, server *server, subject Subject, processKind processKind, procFunc func() error) process { // create the initial configuration for a sessions communicating with 1 host process. - processes.lastProcessID++ + server.processes.lastProcessID++ ctx, cancel := context.WithCancel(ctx) var method Method proc := process{ + server: server, messageID: 0, subject: subject, - node: Node(configuration.NodeName), - processID: processes.lastProcessID, + node: Node(server.configuration.NodeName), + processID: server.processes.lastProcessID, processKind: processKind, methodsAvailable: method.GetMethodsAvailable(), - toRingbufferCh: toRingbufferCh, - configuration: configuration, - processes: processes, - natsConn: natsConn, + toRingbufferCh: server.toRingBufferCh, + configuration: server.configuration, + processes: server.processes, + natsConn: server.natsConn, ctx: ctx, ctxCancel: cancel, - startup: newStartup(metrics, signatures), - signatures: signatures, + startup: newStartup(server), + signatures: server.signatures, } return proc @@ -138,7 +142,7 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc // // It will give the process the next available ID, and also add the // process to the processes map in the server structure. -func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { +func (p process) spawnWorker() { // We use the full name of the subject to identify a unique // process. We can do that since a process can only handle // one message queue. @@ -153,7 +157,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { processName := processNameGet(p.subject.name(), p.processKind) // Add prometheus metrics for the process. - p.processes.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) + p.server.metrics.promProcessesAllRunning.With(prometheus.Labels{"processName": string(processName)}) // Start a publisher worker, which will start a go routine (process) // That will take care of all the messages for the subject it owns. @@ -176,7 +180,7 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { }() } - go p.publishMessages(natsConn) + go p.publishMessages(p.natsConn) } // Start a subscriber worker, which will start a go routine (process) @@ -205,9 +209,9 @@ func (p process) spawnWorker(procs *processes, natsConn *nats.Conn) { p.processName = pn // Add information about the new process to the started processes map. - procs.active.mu.Lock() - procs.active.procNames[pn] = p - procs.active.mu.Unlock() + p.server.processes.active.mu.Lock() + p.server.processes.active.procNames[pn] = p + p.server.processes.active.mu.Unlock() } // messageDeliverNats will create the Nats message with headers and payload. @@ -242,7 +246,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He log.Printf("%v\n", er) return } - p.processes.metrics.promNatsDeliveredTotal.Inc() + p.server.metrics.promNatsDeliveredTotal.Inc() return } @@ -309,7 +313,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He subReply.Unsubscribe() - p.processes.metrics.promNatsMessagesFailedACKsTotal.Inc() + p.server.metrics.promNatsMessagesFailedACKsTotal.Inc() return default: @@ -317,7 +321,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He er := fmt.Errorf("max retries for message not reached, retrying sending of message with ID %v", message.ID) p.processes.errorKernel.logConsoleOnlyIfDebug(er, p.configuration) - p.processes.metrics.promNatsMessagesMissedACKsTotal.Inc() + p.server.metrics.promNatsMessagesMissedACKsTotal.Inc() subReply.Unsubscribe() continue @@ -328,7 +332,7 @@ func (p process) messageDeliverNats(natsMsgPayload []byte, natsMsgHeader nats.He subReply.Unsubscribe() - p.processes.metrics.promNatsDeliveredTotal.Inc() + p.server.metrics.promNatsDeliveredTotal.Inc() return } diff --git a/processes.go b/processes.go index 5dfd24a..e3ff886 100644 --- a/processes.go +++ b/processes.go @@ -17,6 +17,8 @@ type processes struct { // cancel func to send cancel signal to the subscriber processes context. cancel context.CancelFunc // The active spawned processes + // server + server *server active procsMap // mutex to lock the map // mu sync.RWMutex @@ -39,13 +41,14 @@ type processes struct { // newProcesses will prepare and return a *processes which // is map containing all the currently running processes. -func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel, configuration *Configuration, signatures *signatures) *processes { +func newProcesses(ctx context.Context, server *server) *processes { p := processes{ + server: server, active: *newProcsMap(), - tui: tui, - errorKernel: errorKernel, - configuration: configuration, - Signatures: signatures, + tui: server.tui, + errorKernel: server.errorKernel, + configuration: server.configuration, + Signatures: server.signatures, } // Prepare the parent context for the subscribers. @@ -59,8 +62,6 @@ func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel * p.ctx = ctx p.cancel = cancel - p.metrics = metrics - return &p } @@ -94,22 +95,22 @@ func (p *processes) Start(proc process) { { log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node) sub := newSubject(REQOpProcessList, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures) - go proc.spawnWorker(proc.processes, proc.natsConn) + proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } { log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node) sub := newSubject(REQOpProcessStart, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures) - go proc.spawnWorker(proc.processes, proc.natsConn) + proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } { log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node) sub := newSubject(REQOpProcessStop, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, p.Signatures) - go proc.spawnWorker(proc.processes, proc.natsConn) + proc := newProcess(proc.ctx, p.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } // Start a subscriber for textLogging messages @@ -217,12 +218,11 @@ func (p *processes) Stop() { // Startup holds all the startup methods for subscribers. type startup struct { - metrics *metrics - Signatures *signatures + server *server } -func newStartup(metrics *metrics, signatures *signatures) *startup { - s := startup{metrics: metrics, Signatures: signatures} +func newStartup(server *server) *startup { + s := startup{server} return &s } @@ -231,9 +231,9 @@ func (s startup) subREQHttpGet(p process) { log.Printf("Starting Http Get subscriber: %#v\n", p.node) sub := newSubject(REQHttpGet, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures) + proc := newProcess(p.ctx, p.processes.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } @@ -241,9 +241,9 @@ func (s startup) subREQHttpGetScheduled(p process) { log.Printf("Starting Http Get Scheduled subscriber: %#v\n", p.node) sub := newSubject(REQHttpGetScheduled, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, s.Signatures) + proc := newProcess(p.ctx, p.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } @@ -251,7 +251,7 @@ func (s startup) pubREQHello(p process) { log.Printf("Starting Hello Publisher: %#v\n", p.node) sub := newSubject(REQHello, p.configuration.CentralNodeName) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindPublisher, nil, s.Signatures) + proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil) // Define the procFunc to be used for the process. proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error { @@ -289,49 +289,49 @@ func (s startup) pubREQHello(p process) { } } } - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQToConsole(p process) { log.Printf("Starting Text To Console subscriber: %#v\n", p.node) sub := newSubject(REQToConsole, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) - go proc.spawnWorker(p.processes, p.natsConn) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } func (s startup) subREQTuiToConsole(p process) { log.Printf("Starting Tui To Console subscriber: %#v\n", p.node) sub := newSubject(REQTuiToConsole, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) - go proc.spawnWorker(p.processes, p.natsConn) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } func (s startup) subREQCliCommand(p process) { log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) sub := newSubject(REQCliCommand, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) - go proc.spawnWorker(p.processes, p.natsConn) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } func (s startup) subREQPong(p process) { log.Printf("Starting Pong subscriber: %#v\n", p.node) sub := newSubject(REQPong, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) - go proc.spawnWorker(p.processes, p.natsConn) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } func (s startup) subREQPing(p process) { log.Printf("Starting Ping Request subscriber: %#v\n", p.node) sub := newSubject(REQPing, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) - go proc.spawnWorker(p.processes, p.natsConn) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } func (s startup) subREQErrorLog(p process) { log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) - go proc.spawnWorker(p.processes, p.natsConn) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() } // subREQHello is the handler that is triggered when we are receiving a hello @@ -344,7 +344,7 @@ func (s startup) subREQErrorLog(p process) { func (s startup) subREQHello(p process) { log.Printf("Starting Hello subscriber: %#v\n", p.node) sub := newSubject(REQHello, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) // The reason for running the say hello subscriber as a procFunc is that // a handler are not able to hold state, and we need to hold the state @@ -370,101 +370,101 @@ func (s startup) subREQHello(p process) { sayHelloNodes[m.FromNode] = struct{}{} // update the prometheus metrics - s.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) - s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() + s.server.metrics.promHelloNodesTotal.Set(float64(len(sayHelloNodes))) + s.server.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() } } - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQToFile(p process) { log.Printf("Starting text to file subscriber: %#v\n", p.node) sub := newSubject(REQToFile, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQToFileNACK(p process) { log.Printf("Starting text to file subscriber: %#v\n", p.node) sub := newSubject(REQToFileNACK, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQCopyFileFrom(p process) { log.Printf("Starting copy file from subscriber: %#v\n", p.node) sub := newSubject(REQCopyFileFrom, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQCopyFileTo(p process) { log.Printf("Starting copy file to subscriber: %#v\n", p.node) sub := newSubject(REQCopyFileTo, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQTailFile(p process) { log.Printf("Starting tail log files subscriber: %#v\n", p.node) sub := newSubject(REQTailFile, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQCliCommandCont(p process) { log.Printf("Starting cli command with continous delivery: %#v\n", p.node) sub := newSubject(REQCliCommandCont, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQRelay(p process) { nodeWithRelay := fmt.Sprintf("*.%v", p.node) log.Printf("Starting Relay: %#v\n", nodeWithRelay) sub := newSubject(REQRelay, string(nodeWithRelay)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQRelayInitial(p process) { log.Printf("Starting Relay Initial: %#v\n", p.node) sub := newSubject(REQRelayInitial, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQToSocket(p process) { log.Printf("Starting write to socket subscriber: %#v\n", p.node) sub := newSubject(REQToSocket, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } func (s startup) subREQPublicKey(p process) { log.Printf("Starting get Public Key subscriber: %#v\n", p.node) sub := newSubject(REQPublicKey, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, processKindSubscriber, nil, p.signatures) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) - go proc.spawnWorker(p.processes, p.natsConn) + go proc.spawnWorker() } // --------------------------------------------------------------- @@ -480,7 +480,7 @@ func (p *processes) printProcessesMap() { log.Printf("* proc - pub/sub: %v, procName in map: %v , id: %v, subject: %v\n", proc.processKind, pName, proc.processID, proc.subject.name()) } - p.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) + p.server.metrics.promProcessesTotal.Set(float64(len(p.active.procNames))) p.active.mu.Unlock() } diff --git a/requests.go b/requests.go index fdbce99..e14b9ef 100644 --- a/requests.go +++ b/requests.go @@ -473,8 +473,8 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, processKindSubscriber, nil, proc.signatures) - go procNew.spawnWorker(proc.processes, proc.natsConn) + procNew := newProcess(proc.ctx, proc.server, sub, processKindSubscriber, nil) + go procNew.spawnWorker() txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) er := fmt.Errorf(txt) @@ -564,7 +564,7 @@ func (m methodREQOpProcessStop) handler(proc process, message Message, node stri } // Remove the prometheus label - proc.processes.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) + proc.server.metrics.promProcessesAllRunning.Delete(prometheus.Labels{"processName": string(processName)}) txt := fmt.Sprintf("info: OpProcessStop: process stopped id: %v, method: %v on: %v", toStopProc.processID, sub, message.ToNode) er := fmt.Errorf(txt) @@ -1011,7 +1011,7 @@ func (m methodREQErrorLog) getKind() Event { // Handle the writing of error logs. func (m methodREQErrorLog) handler(proc process, message Message, node string) ([]byte, error) { - proc.processes.metrics.promErrorMessagesReceivedTotal.Inc() + proc.server.metrics.promErrorMessagesReceivedTotal.Inc() // If it was a request type message we want to check what the initial messages // method, so we can use that in creating the file name to store the data. diff --git a/server.go b/server.go index c279ea8..fa6b610 100644 --- a/server.go +++ b/server.go @@ -40,12 +40,12 @@ type server struct { processes *processes // The name of the node nodeName string - // ringBufferBulkInCh are the channel where new messages in a bulk + // toRingBufferCh are the channel where new messages in a bulk // format (slice) are put into the system. // // In general the ringbuffer will read this // channel, unfold each slice, and put single messages on the buffer. - ringBufferBulkInCh chan []subjectAndMessage + toRingBufferCh chan []subjectAndMessage // errorKernel is doing all the error handling like what to do if // an error occurs. errorKernel *errorKernel @@ -148,24 +148,25 @@ func NewServer(configuration *Configuration, version string) (*server, error) { signatures := newSignatures(configuration, errorKernel) // fmt.Printf(" * DEBUG: newServer: signatures contains: %+v\n", signatures) - s := &server{ - ctx: ctx, - cancel: cancel, - configuration: configuration, - nodeName: configuration.NodeName, - natsConn: conn, - StewardSocket: stewardSocket, - processes: newProcesses(ctx, metrics, tuiClient, errorKernel, configuration, signatures), - ringBufferBulkInCh: make(chan []subjectAndMessage), - metrics: metrics, - version: version, - tui: tuiClient, - errorKernel: errorKernel, - signatures: signatures, - helloRegister: newHelloRegister(), - centralAuth: newCentralAuth(), + s := server{ + ctx: ctx, + cancel: cancel, + configuration: configuration, + nodeName: configuration.NodeName, + natsConn: conn, + StewardSocket: stewardSocket, + toRingBufferCh: make(chan []subjectAndMessage), + metrics: metrics, + version: version, + tui: tuiClient, + errorKernel: errorKernel, + signatures: signatures, + helloRegister: newHelloRegister(), + centralAuth: newCentralAuth(), } + s.processes = newProcesses(ctx, &s) + // Create the default data folder for where subscribers should // write it's data, check if data folder exist, and create it if needed. if _, err := os.Stat(configuration.SubscribersDataFolder); os.IsNotExist(err) { @@ -181,7 +182,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { s.errorKernel.logConsoleOnlyIfDebug(er, s.configuration) } - return s, nil + return &s, nil } @@ -235,7 +236,7 @@ func (s *server) Start() { s.metrics.promVersion.With(prometheus.Labels{"version": string(s.version)}) go func() { - err := s.errorKernel.start(s.ringBufferBulkInCh) + err := s.errorKernel.start(s.toRingBufferCh) if err != nil { log.Printf("%v\n", err) } @@ -272,7 +273,7 @@ func (s *server) Start() { // // NB: The context of the initial process are set in processes.Start. sub := newSubject(REQInitial, s.nodeName) - s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, "", nil, s.signatures) + s.processInitial = newProcess(context.TODO(), s, sub, "", nil) // Start all wanted subscriber processes. s.processes.Start(s.processInitial) @@ -287,7 +288,7 @@ func (s *server) Start() { if s.configuration.EnableTUI { go func() { - err := s.tui.Start(s.ctx, s.ringBufferBulkInCh) + err := s.tui.Start(s.ctx, s.toRingBufferCh) if err != nil { log.Printf("%v\n", err) os.Exit(1) @@ -392,7 +393,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { const samValueBucket string = "samValueBucket" const indexValueBucket string = "indexValueBucket" - s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.ringBufferBulkInCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial) + s.ringBuffer = newringBuffer(s.ctx, s.metrics, s.configuration, bufferSize, dbFileName, Node(s.nodeName), s.toRingBufferCh, samValueBucket, indexValueBucket, s.errorKernel, s.processInitial) ringBufferInCh := make(chan subjectAndMessage) ringBufferOutCh := make(chan samDBValueAndDelivered) @@ -405,7 +406,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // we loop here, unfold the slice, and put single subjectAndMessages's on // the channel to the ringbuffer. go func() { - for sams := range s.ringBufferBulkInCh { + for sams := range s.toRingBufferCh { for _, sam := range sams { ringBufferInCh <- sam } @@ -497,9 +498,9 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, processKindPublisher, nil, s.signatures) + proc := newProcess(s.ctx, s, sub, processKindPublisher, nil) - proc.spawnWorker(s.processes, s.natsConn) + proc.spawnWorker() // log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID) // Now when the process is spawned we continue, diff --git a/steward_test.go b/steward_test.go index e4eb7a1..2a34760 100644 --- a/steward_test.go +++ b/steward_test.go @@ -235,7 +235,7 @@ func checkREQErrorLogTest(stewardServer *server, conf *Configuration, t *testing ToNode: "somenode", } - p := newProcess(stewardServer.ctx, stewardServer.metrics, stewardServer.natsConn, stewardServer.processes, stewardServer.processInitial.toRingbufferCh, stewardServer.configuration, Subject{}, processKindSubscriber, nil, stewardServer.signatures) + p := newProcess(stewardServer.ctx, stewardServer, Subject{}, processKindSubscriber, nil) stewardServer.errorKernel.errSend(p, m, fmt.Errorf("some error"))