mirror of
https://github.com/postmannen/ctrl.git
synced 2025-03-05 06:46:48 +00:00
moved the ownership of signatures server from processes
This commit is contained in:
parent
fb68379c58
commit
426b98941e
5 changed files with 66 additions and 48 deletions
|
@ -84,11 +84,14 @@ type process struct {
|
||||||
// startup holds the startup functions for starting up publisher
|
// startup holds the startup functions for starting up publisher
|
||||||
// or subscriber processes
|
// or subscriber processes
|
||||||
startup *startup
|
startup *startup
|
||||||
|
|
||||||
|
// Signatures
|
||||||
|
signatures *signatures
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepareNewProcess will set the the provided values and the default
|
// prepareNewProcess will set the the provided values and the default
|
||||||
// values for a process.
|
// values for a process.
|
||||||
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errorEvent, processKind processKind, procFunc func() error) process {
|
func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, processes *processes, toRingbufferCh chan<- []subjectAndMessage, configuration *Configuration, subject Subject, errCh chan errorEvent, processKind processKind, procFunc func() error, signatures *signatures) process {
|
||||||
// create the initial configuration for a sessions communicating with 1 host process.
|
// create the initial configuration for a sessions communicating with 1 host process.
|
||||||
processes.lastProcessID++
|
processes.lastProcessID++
|
||||||
|
|
||||||
|
@ -110,7 +113,7 @@ func newProcess(ctx context.Context, metrics *metrics, natsConn *nats.Conn, proc
|
||||||
natsConn: natsConn,
|
natsConn: natsConn,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
ctxCancel: cancel,
|
ctxCancel: cancel,
|
||||||
startup: newStartup(metrics),
|
startup: newStartup(metrics, signatures),
|
||||||
}
|
}
|
||||||
|
|
||||||
return proc
|
return proc
|
||||||
|
|
70
processes.go
70
processes.go
|
@ -45,27 +45,23 @@ type processes struct {
|
||||||
// Full path to public signing key.
|
// Full path to public signing key.
|
||||||
SignKeyPublicKeyPath string
|
SignKeyPublicKeyPath string
|
||||||
|
|
||||||
// Signatures holds all the signatures,
|
|
||||||
// and the public keys
|
|
||||||
Signatures signatures
|
|
||||||
|
|
||||||
// private key for ed25519 signing.
|
// private key for ed25519 signing.
|
||||||
SignPrivateKey []byte
|
SignPrivateKey []byte
|
||||||
// public key for ed25519 signing.
|
// public key for ed25519 signing.
|
||||||
SignPublicKey []byte
|
SignPublicKey []byte
|
||||||
|
|
||||||
|
// Signatures
|
||||||
|
Signatures *signatures
|
||||||
}
|
}
|
||||||
|
|
||||||
// newProcesses will prepare and return a *processes which
|
// newProcesses will prepare and return a *processes which
|
||||||
// is map containing all the currently running processes.
|
// is map containing all the currently running processes.
|
||||||
func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel, configuration *Configuration) *processes {
|
func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *errorKernel, configuration *Configuration, signatures *signatures) *processes {
|
||||||
p := processes{
|
p := processes{
|
||||||
active: *newProcsMap(),
|
active: *newProcsMap(),
|
||||||
tui: tui,
|
tui: tui,
|
||||||
errorKernel: errorKernel,
|
errorKernel: errorKernel,
|
||||||
configuration: configuration,
|
configuration: configuration,
|
||||||
Signatures: signatures{
|
|
||||||
allowed: make(map[signature]struct{}),
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare the parent context for the subscribers.
|
// Prepare the parent context for the subscribers.
|
||||||
|
@ -91,17 +87,6 @@ func newProcesses(ctx context.Context, metrics *metrics, tui *tui, errorKernel *
|
||||||
|
|
||||||
// ----------------------
|
// ----------------------
|
||||||
|
|
||||||
type signature string
|
|
||||||
|
|
||||||
// allowedSignatures is the structure for reading and writing from
|
|
||||||
// the signatures map. It holds a mutex to use when interacting with
|
|
||||||
// the map.
|
|
||||||
type signatures struct {
|
|
||||||
// allowed is a map for holding all the allowed signatures.
|
|
||||||
allowed map[signature]struct{}
|
|
||||||
mu sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// loadSigningKeys will try to load the ed25519 signing keys. If the
|
// loadSigningKeys will try to load the ed25519 signing keys. If the
|
||||||
// files are not found new keys will be generated and written to disk.
|
// files are not found new keys will be generated and written to disk.
|
||||||
func (p *processes) loadSigningKeys(initProc process) error {
|
func (p *processes) loadSigningKeys(initProc process) error {
|
||||||
|
@ -249,21 +234,21 @@ func (p *processes) Start(proc process) {
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
|
log.Printf("Starting REQOpProcessList subscriber: %#v\n", proc.node)
|
||||||
sub := newSubject(REQOpProcessList, string(proc.node))
|
sub := newSubject(REQOpProcessList, string(proc.node))
|
||||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
|
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures)
|
||||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
go proc.spawnWorker(proc.processes, proc.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node)
|
log.Printf("Starting REQOpProcessStart subscriber: %#v\n", proc.node)
|
||||||
sub := newSubject(REQOpProcessStart, string(proc.node))
|
sub := newSubject(REQOpProcessStart, string(proc.node))
|
||||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
|
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures)
|
||||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
go proc.spawnWorker(proc.processes, proc.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node)
|
log.Printf("Starting REQOpProcessStop subscriber: %#v\n", proc.node)
|
||||||
sub := newSubject(REQOpProcessStop, string(proc.node))
|
sub := newSubject(REQOpProcessStop, string(proc.node))
|
||||||
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
|
proc := newProcess(proc.ctx, p.metrics, proc.natsConn, p, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, p.Signatures)
|
||||||
go proc.spawnWorker(proc.processes, proc.natsConn)
|
go proc.spawnWorker(proc.processes, proc.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,10 +346,11 @@ func (p *processes) Stop() {
|
||||||
|
|
||||||
// Startup holds all the startup methods for subscribers.
|
// Startup holds all the startup methods for subscribers.
|
||||||
type startup struct {
|
type startup struct {
|
||||||
metrics *metrics
|
metrics *metrics
|
||||||
|
Signatures *signatures
|
||||||
}
|
}
|
||||||
|
|
||||||
func newStartup(metrics *metrics) *startup {
|
func newStartup(metrics *metrics, signatures *signatures) *startup {
|
||||||
s := startup{metrics: metrics}
|
s := startup{metrics: metrics}
|
||||||
|
|
||||||
return &s
|
return &s
|
||||||
|
@ -374,7 +360,7 @@ func (s startup) subREQHttpGet(p process) {
|
||||||
|
|
||||||
log.Printf("Starting Http Get subscriber: %#v\n", p.node)
|
log.Printf("Starting Http Get subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQHttpGet, string(p.node))
|
sub := newSubject(REQHttpGet, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, s.Signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
|
|
||||||
|
@ -384,7 +370,7 @@ func (s startup) pubREQHello(p process) {
|
||||||
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
log.Printf("Starting Hello Publisher: %#v\n", p.node)
|
||||||
|
|
||||||
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
sub := newSubject(REQHello, p.configuration.CentralNodeName)
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindPublisher, nil, s.Signatures)
|
||||||
|
|
||||||
// Define the procFunc to be used for the process.
|
// Define the procFunc to be used for the process.
|
||||||
proc.procFunc = procFunc(
|
proc.procFunc = procFunc(
|
||||||
|
@ -429,42 +415,42 @@ func (s startup) pubREQHello(p process) {
|
||||||
func (s startup) subREQToConsole(p process) {
|
func (s startup) subREQToConsole(p process) {
|
||||||
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToConsole, string(p.node))
|
sub := newSubject(REQToConsole, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQTuiToConsole(p process) {
|
func (s startup) subREQTuiToConsole(p process) {
|
||||||
log.Printf("Starting Tui To Console subscriber: %#v\n", p.node)
|
log.Printf("Starting Tui To Console subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQTuiToConsole, string(p.node))
|
sub := newSubject(REQTuiToConsole, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQCliCommand(p process) {
|
func (s startup) subREQCliCommand(p process) {
|
||||||
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
log.Printf("Starting CLICommand Request subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCliCommand, string(p.node))
|
sub := newSubject(REQCliCommand, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQPong(p process) {
|
func (s startup) subREQPong(p process) {
|
||||||
log.Printf("Starting Pong subscriber: %#v\n", p.node)
|
log.Printf("Starting Pong subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQPong, string(p.node))
|
sub := newSubject(REQPong, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQPing(p process) {
|
func (s startup) subREQPing(p process) {
|
||||||
log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
|
log.Printf("Starting Ping Request subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQPing, string(p.node))
|
sub := newSubject(REQPing, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s startup) subREQErrorLog(p process) {
|
func (s startup) subREQErrorLog(p process) {
|
||||||
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
|
log.Printf("Starting REQErrorLog subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQErrorLog, "errorCentral")
|
sub := newSubject(REQErrorLog, "errorCentral")
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,7 +464,7 @@ func (s startup) subREQErrorLog(p process) {
|
||||||
func (s startup) subREQHello(p process) {
|
func (s startup) subREQHello(p process) {
|
||||||
log.Printf("Starting Hello subscriber: %#v\n", p.node)
|
log.Printf("Starting Hello subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQHello, string(p.node))
|
sub := newSubject(REQHello, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
// The reason for running the say hello subscriber as a procFunc is that
|
// The reason for running the say hello subscriber as a procFunc is that
|
||||||
// a handler are not able to hold state, and we need to hold the state
|
// a handler are not able to hold state, and we need to hold the state
|
||||||
|
@ -515,7 +501,7 @@ func (s startup) subREQHello(p process) {
|
||||||
func (s startup) subREQToFile(p process) {
|
func (s startup) subREQToFile(p process) {
|
||||||
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
log.Printf("Starting text to file subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToFile, string(p.node))
|
sub := newSubject(REQToFile, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -523,7 +509,7 @@ func (s startup) subREQToFile(p process) {
|
||||||
func (s startup) subREQCopyFileFrom(p process) {
|
func (s startup) subREQCopyFileFrom(p process) {
|
||||||
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
log.Printf("Starting copy file from subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCopyFileFrom, string(p.node))
|
sub := newSubject(REQCopyFileFrom, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -531,7 +517,7 @@ func (s startup) subREQCopyFileFrom(p process) {
|
||||||
func (s startup) subREQCopyFileTo(p process) {
|
func (s startup) subREQCopyFileTo(p process) {
|
||||||
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
|
log.Printf("Starting copy file to subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQCopyFileTo, string(p.node))
|
sub := newSubject(REQCopyFileTo, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -539,7 +525,7 @@ func (s startup) subREQCopyFileTo(p process) {
|
||||||
func (s startup) subREQToFileAppend(p process) {
|
func (s startup) subREQToFileAppend(p process) {
|
||||||
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
log.Printf("Starting text logging subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToFileAppend, string(p.node))
|
sub := newSubject(REQToFileAppend, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -547,7 +533,7 @@ func (s startup) subREQToFileAppend(p process) {
|
||||||
func (s startup) subREQTailFile(p process) {
|
func (s startup) subREQTailFile(p process) {
|
||||||
log.Printf("Starting tail log files subscriber: %#v\n", p.node)
|
log.Printf("Starting tail log files subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQTailFile, string(p.node))
|
sub := newSubject(REQTailFile, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -555,7 +541,7 @@ func (s startup) subREQTailFile(p process) {
|
||||||
func (s startup) subREQCliCommandCont(p process) {
|
func (s startup) subREQCliCommandCont(p process) {
|
||||||
log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
|
log.Printf("Starting cli command with continous delivery: %#v\n", p.node)
|
||||||
sub := newSubject(REQCliCommandCont, string(p.node))
|
sub := newSubject(REQCliCommandCont, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -564,7 +550,7 @@ func (s startup) subREQRelay(p process) {
|
||||||
nodeWithRelay := fmt.Sprintf("*.%v", p.node)
|
nodeWithRelay := fmt.Sprintf("*.%v", p.node)
|
||||||
log.Printf("Starting Relay: %#v\n", nodeWithRelay)
|
log.Printf("Starting Relay: %#v\n", nodeWithRelay)
|
||||||
sub := newSubject(REQRelay, string(nodeWithRelay))
|
sub := newSubject(REQRelay, string(nodeWithRelay))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -572,7 +558,7 @@ func (s startup) subREQRelay(p process) {
|
||||||
func (s startup) subREQRelayInitial(p process) {
|
func (s startup) subREQRelayInitial(p process) {
|
||||||
log.Printf("Starting Relay Initial: %#v\n", p.node)
|
log.Printf("Starting Relay Initial: %#v\n", p.node)
|
||||||
sub := newSubject(REQRelayInitial, string(p.node))
|
sub := newSubject(REQRelayInitial, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
@ -580,7 +566,7 @@ func (s startup) subREQRelayInitial(p process) {
|
||||||
func (s startup) subREQToSocket(p process) {
|
func (s startup) subREQToSocket(p process) {
|
||||||
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
log.Printf("Starting write to socket subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToSocket, string(p.node))
|
sub := newSubject(REQToSocket, string(p.node))
|
||||||
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil)
|
proc := newProcess(p.ctx, s.metrics, p.natsConn, p.processes, p.toRingbufferCh, p.configuration, sub, p.errorCh, processKindSubscriber, nil, p.signatures)
|
||||||
|
|
||||||
go proc.spawnWorker(p.processes, p.natsConn)
|
go proc.spawnWorker(p.processes, p.natsConn)
|
||||||
}
|
}
|
||||||
|
|
|
@ -459,7 +459,7 @@ func (m methodREQOpProcessStart) handler(proc process, message Message, node str
|
||||||
|
|
||||||
// Create the process and start it.
|
// Create the process and start it.
|
||||||
sub := newSubject(method, proc.configuration.NodeName)
|
sub := newSubject(method, proc.configuration.NodeName)
|
||||||
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil)
|
procNew := newProcess(proc.ctx, proc.processes.metrics, proc.natsConn, proc.processes, proc.toRingbufferCh, proc.configuration, sub, proc.errorCh, processKindSubscriber, nil, proc.signatures)
|
||||||
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
go procNew.spawnWorker(proc.processes, proc.natsConn)
|
||||||
|
|
||||||
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
txt := fmt.Sprintf("info: OpProcessStart: started id: %v, subject: %v: node: %v", procNew.processID, sub, message.ToNode)
|
||||||
|
|
13
server.go
13
server.go
|
@ -59,6 +59,10 @@ type server struct {
|
||||||
tui *tui
|
tui *tui
|
||||||
// processInitial is the initial process that all other processes are tied to.
|
// processInitial is the initial process that all other processes are tied to.
|
||||||
processInitial process
|
processInitial process
|
||||||
|
|
||||||
|
// Signatures holds all the signatures,
|
||||||
|
// and the public keys
|
||||||
|
Signatures *signatures
|
||||||
}
|
}
|
||||||
|
|
||||||
// newServer will prepare and return a server type
|
// newServer will prepare and return a server type
|
||||||
|
@ -136,6 +140,8 @@ func NewServer(c *Configuration, version string) (*server, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
signatures := newSignatures()
|
||||||
|
|
||||||
s := &server{
|
s := &server{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
@ -143,12 +149,13 @@ func NewServer(c *Configuration, version string) (*server, error) {
|
||||||
nodeName: c.NodeName,
|
nodeName: c.NodeName,
|
||||||
natsConn: conn,
|
natsConn: conn,
|
||||||
StewardSocket: stewardSocket,
|
StewardSocket: stewardSocket,
|
||||||
processes: newProcesses(ctx, metrics, tuiClient, errorKernel, c),
|
processes: newProcesses(ctx, metrics, tuiClient, errorKernel, c, signatures),
|
||||||
ringBufferBulkInCh: make(chan []subjectAndMessage),
|
ringBufferBulkInCh: make(chan []subjectAndMessage),
|
||||||
metrics: metrics,
|
metrics: metrics,
|
||||||
version: version,
|
version: version,
|
||||||
tui: tuiClient,
|
tui: tuiClient,
|
||||||
errorKernel: errorKernel,
|
errorKernel: errorKernel,
|
||||||
|
Signatures: signatures,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the default data folder for where subscribers should
|
// Create the default data folder for where subscribers should
|
||||||
|
@ -246,7 +253,7 @@ func (s *server) Start() {
|
||||||
//
|
//
|
||||||
// NB: The context of the initial process are set in processes.Start.
|
// NB: The context of the initial process are set in processes.Start.
|
||||||
sub := newSubject(REQInitial, s.nodeName)
|
sub := newSubject(REQInitial, s.nodeName)
|
||||||
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil)
|
s.processInitial = newProcess(context.TODO(), s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, "", nil, s.Signatures)
|
||||||
// Start all wanted subscriber processes.
|
// Start all wanted subscriber processes.
|
||||||
s.processes.Start(s.processInitial)
|
s.processes.Start(s.processInitial)
|
||||||
|
|
||||||
|
@ -479,7 +486,7 @@ func (s *server) routeMessagesToProcess(dbFileName string) {
|
||||||
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
// log.Printf("info: processNewMessages: did not find that specific subject, starting new process for subject: %v\n", subjName)
|
||||||
|
|
||||||
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
sub := newSubject(sam.Subject.Method, sam.Subject.ToNode)
|
||||||
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil)
|
proc := newProcess(s.ctx, s.metrics, s.natsConn, s.processes, s.ringBufferBulkInCh, s.configuration, sub, s.errorKernel.errorCh, processKindPublisher, nil, s.Signatures)
|
||||||
|
|
||||||
proc.spawnWorker(s.processes, s.natsConn)
|
proc.spawnWorker(s.processes, s.natsConn)
|
||||||
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
|
// log.Printf("info: processNewMessages: new process started, subject: %v, processID: %v\n", subjName, proc.processID)
|
||||||
|
|
22
signatures.go
Normal file
22
signatures.go
Normal file
|
@ -0,0 +1,22 @@
|
||||||
|
package steward
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type signature string
|
||||||
|
|
||||||
|
// allowedSignatures is the structure for reading and writing from
|
||||||
|
// the signatures map. It holds a mutex to use when interacting with
|
||||||
|
// the map.
|
||||||
|
type signatures struct {
|
||||||
|
// allowed is a map for holding all the allowed signatures.
|
||||||
|
allowed map[signature]struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func newSignatures() *signatures {
|
||||||
|
s := signatures{
|
||||||
|
allowed: make(map[signature]struct{}),
|
||||||
|
}
|
||||||
|
|
||||||
|
return &s
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue