diff --git a/process.go b/process.go index aa72490..01fc359 100644 --- a/process.go +++ b/process.go @@ -84,11 +84,14 @@ type process struct { // startup holds the startup functions for starting up publisher // or subscriber processes startup *startup + + // Signatures + signatures *signatures } // prepareNewProcess will set the the provided values and the default // values for a process. -func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errorEvent, processKind processKind, procFunc func() error) process { +func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errorEvent, processKind processKind, procFunc func() error, signatures *signatures) process { // create the initial configuration for a sessions communicating with 1 host process. processes.lastProcessID++ @@ -110,7 +113,7 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc natsConn: natsConn, ctx: ctx, ctxCancel: cancel, - startup: newStartup(metrics), + startup: newStartup(metrics, signatures), } return proc diff --git a/processes.go b/processes.go index 86a90f9..af7ddb0 100644 --- a/processes.go +++ b/processes.go @@ -45,27 +45,23 @@ type processes struct { // Full path to public signing key. SignKeyPublicKeyPath string - // Signatures holds all the signatures, - // and the public keys - Signatures signatures - // private key for ed25519 signing. SignPrivateKey []byte // public key for ed25519 signing. SignPublicKey []byte + + // Signatures + Signatures *signatures } // newProcesses will prepare and return a *processes which // is map containing all the currently running processes. -func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel, configuration *Configuration) *processes { +func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel, configuration *Configuration, signatures *signatures) *processes { p := processes{ active: *newProcsMap(), tui: tui, errorKernel: errorKernel, configuration: configuration, - Signatures: signatures{ - allowed: make(map[signature]struct{}), - }, } // Prepare the parent context for the subscribers. @@ -91,17 +87,6 @@ func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel * // ---------------------- -type signature string - -// allowedSignatures is the structure for reading and writing from -// the signatures map. It holds a mutex to use when interacting with -// the map. -type signatures struct { - // allowed is a map for holding all the allowed signatures. - allowed map[signature]struct{} - mu sync.Mutex -} - // loadSigningKeys will try to load the ed25519 signing keys. If the // files are not found new keys will be generated and written to disk. func (p *processes) loadSigningKeys(initProc process) error { @@ -249,21 +234,21 @@ func (p *processes) Start(proc process) { { log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node) sub := newSubject(REQOpProcessList, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures) go proc.spawnWorker(proc.processes, proc.natsConn) } { log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node) sub := newSubject(REQOpProcessStart, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures) go proc.spawnWorker(proc.processes, proc.natsConn) } { log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node) sub := newSubject(REQOpProcessStop, string(proc.node)) - proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures) go proc.spawnWorker(proc.processes, proc.natsConn) } @@ -361,10 +346,11 @@ func (p *processes) Stop() { // Startup holds all the startup methods for subscribers. type startup struct { - metrics *metrics + metrics *metrics + Signatures *signatures } -func newStartup(metrics *metrics) *startup { +func newStartup(metrics *metrics, signatures *signatures) *startup { s := startup{metrics: metrics} return &s @@ -374,7 +360,7 @@ func (s startup) subREQHttpGet(p process) { log.Printf("Starting Http Get subscriber: %#v\n", p.node) sub := newSubject(REQHttpGet, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, s.Signatures) go proc.spawnWorker(p.processes, p.natsConn) @@ -384,7 +370,7 @@ func (s startup) pubREQHello(p process) { log.Printf("Starting Hello Publisher: %#v\n", p.node) sub := newSubject(REQHello, p.configuration.CentralNodeName) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil, s.Signatures) // Define the procFunc to be used for the process. proc.procFunc = procFunc( @@ -429,42 +415,42 @@ func (s startup) pubREQHello(p process) { func (s startup) subREQToConsole(p process) { log.Printf("Starting Text To Console subscriber: %#v\n", p.node) sub := newSubject(REQToConsole, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQTuiToConsole(p process) { log.Printf("Starting Tui To Console subscriber: %#v\n", p.node) sub := newSubject(REQTuiToConsole, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQCliCommand(p process) { log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node) sub := newSubject(REQCliCommand, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQPong(p process) { log.Printf("Starting Pong subscriber: %#v\n", p.node) sub := newSubject(REQPong, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQPing(p process) { log.Printf("Starting Ping Request subscriber: %#v\n", p.node) sub := newSubject(REQPing, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } func (s startup) subREQErrorLog(p process) { log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node) sub := newSubject(REQErrorLog, "errorCentral") - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -478,7 +464,7 @@ func (s startup) subREQErrorLog(p process) { func (s startup) subREQHello(p process) { log.Printf("Starting Hello subscriber: %#v\n", p.node) sub := newSubject(REQHello, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) // The reason for running the say hello subscriber as a procFunc is that // a handler are not able to hold state, and we need to hold the state @@ -515,7 +501,7 @@ func (s startup) subREQHello(p process) { func (s startup) subREQToFile(p process) { log.Printf("Starting text to file subscriber: %#v\n", p.node) sub := newSubject(REQToFile, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -523,7 +509,7 @@ func (s startup) subREQToFile(p process) { func (s startup) subREQCopyFileFrom(p process) { log.Printf("Starting copy file from subscriber: %#v\n", p.node) sub := newSubject(REQCopyFileFrom, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -531,7 +517,7 @@ func (s startup) subREQCopyFileFrom(p process) { func (s startup) subREQCopyFileTo(p process) { log.Printf("Starting copy file to subscriber: %#v\n", p.node) sub := newSubject(REQCopyFileTo, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -539,7 +525,7 @@ func (s startup) subREQCopyFileTo(p process) { func (s startup) subREQToFileAppend(p process) { log.Printf("Starting text logging subscriber: %#v\n", p.node) sub := newSubject(REQToFileAppend, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -547,7 +533,7 @@ func (s startup) subREQToFileAppend(p process) { func (s startup) subREQTailFile(p process) { log.Printf("Starting tail log files subscriber: %#v\n", p.node) sub := newSubject(REQTailFile, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -555,7 +541,7 @@ func (s startup) subREQTailFile(p process) { func (s startup) subREQCliCommandCont(p process) { log.Printf("Starting cli command with continous delivery: %#v\n", p.node) sub := newSubject(REQCliCommandCont, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -564,7 +550,7 @@ func (s startup) subREQRelay(p process) { nodeWithRelay := fmt.Sprintf("*.%v", p.node) log.Printf("Starting Relay: %#v\n", nodeWithRelay) sub := newSubject(REQRelay, string(nodeWithRelay)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -572,7 +558,7 @@ func (s startup) subREQRelay(p process) { func (s startup) subREQRelayInitial(p process) { log.Printf("Starting Relay Initial: %#v\n", p.node) sub := newSubject(REQRelayInitial, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } @@ -580,7 +566,7 @@ func (s startup) subREQRelayInitial(p process) { func (s startup) subREQToSocket(p process) { log.Printf("Starting write to socket subscriber: %#v\n", p.node) sub := newSubject(REQToSocket, string(p.node)) - proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil) + proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures) go proc.spawnWorker(p.processes, p.natsConn) } diff --git a/requests.go b/requests.go index ecd9922..31f8d48 100644 --- a/requests.go +++ b/requests.go @@ -459,7 +459,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str // Create the process and start it. sub := newSubject(method, proc.configuration.NodeName) - procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil) + procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, proc.signatures) go procNew.spawnWorker(proc.processes, proc.natsConn) txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode) diff --git a/server.go b/server.go index dd17368..838317f 100644 --- a/server.go +++ b/server.go @@ -59,6 +59,10 @@ type server struct { tui *tui // processInitial is the initial process that all other processes are tied to. processInitial process + + // Signatures holds all the signatures, + // and the public keys + Signatures *signatures } // newServer will prepare and return a server type @@ -136,6 +140,8 @@ func NewServer(c *Configuration, version string) (*server, error) { } + signatures := newSignatures() + s := &server{ ctx: ctx, cancel: cancel, @@ -143,12 +149,13 @@ func NewServer(c *Configuration, version string) (*server, error) { nodeName: c.NodeName, natsConn: conn, StewardSocket: stewardSocket, - processes: newProcesses(ctx, metrics, tuiClient, errorKernel, c), + processes: newProcesses(ctx, metrics, tuiClient, errorKernel, c, signatures), ringBufferBulkInCh: make(chan []subjectAndMessage), metrics: metrics, version: version, tui: tuiClient, errorKernel: errorKernel, + Signatures: signatures, } // Create the default data folder for where subscribers should @@ -246,7 +253,7 @@ func (s *server) Start() { // // NB: The context of the initial process are set in processes.Start. sub := newSubject(REQInitial, s.nodeName) - s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil) + s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil, s.Signatures) // Start all wanted subscriber processes. s.processes.Start(s.processInitial) @@ -479,7 +486,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) { // log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName) sub := newSubject(sam.Subject.Method, sam.Subject.ToNode) - proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil) + proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, s.Signatures) proc.spawnWorker(s.processes, s.natsConn) // log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID) diff --git a/signatures.go b/signatures.go new file mode 100644 index 0000000..8bc1598 --- /dev/null +++ b/signatures.go @@ -0,0 +1,22 @@ +package steward + +import "sync" + +type signature string + +// allowedSignatures is the structure for reading and writing from +// the signatures map. It holds a mutex to use when interacting with +// the map. +type signatures struct { + // allowed is a map for holding all the allowed signatures. + allowed map[signature]struct{} + mu sync.Mutex +} + +func newSignatures() *signatures { + s := signatures{ + allowed: make(map[signature]struct{}), + } + + return &s +}