mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-18 21:59:30 +00:00
initial key distribution testing
This commit is contained in:
parent
10796f3556
commit
b669dc537c
7 changed files with 247 additions and 21 deletions
|
@ -49,7 +49,7 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen
|
||||||
|
|
||||||
// Only assign from storage to in memory map if the storage contained any values.
|
// Only assign from storage to in memory map if the storage contained any values.
|
||||||
if keys != nil {
|
if keys != nil {
|
||||||
c.nodePublicKeys.keyMap = keys
|
c.nodePublicKeys.KeyMap = keys
|
||||||
for k, v := range keys {
|
for k, v := range keys {
|
||||||
log.Printf("info: public keys db contains: %v, %v\n", k, []byte(v))
|
log.Printf("info: public keys db contains: %v, %v\n", k, []byte(v))
|
||||||
}
|
}
|
||||||
|
@ -64,7 +64,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
||||||
c.nodePublicKeys.mu.Lock()
|
c.nodePublicKeys.mu.Lock()
|
||||||
|
|
||||||
// Check if a key for the current node already exists in the map.
|
// Check if a key for the current node already exists in the map.
|
||||||
existingKey, ok := c.nodePublicKeys.keyMap[msg.FromNode]
|
existingKey, ok := c.nodePublicKeys.KeyMap[msg.FromNode]
|
||||||
|
|
||||||
if ok && existingKey == string(msg.Data) {
|
if ok && existingKey == string(msg.Data) {
|
||||||
fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode)
|
fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode)
|
||||||
|
@ -73,7 +73,7 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// New key
|
// New key
|
||||||
c.nodePublicKeys.keyMap[msg.FromNode] = string(msg.Data)
|
c.nodePublicKeys.KeyMap[msg.FromNode] = string(msg.Data)
|
||||||
c.nodePublicKeys.mu.Unlock()
|
c.nodePublicKeys.mu.Unlock()
|
||||||
|
|
||||||
// Add key to persistent storage.
|
// Add key to persistent storage.
|
||||||
|
@ -188,13 +188,13 @@ func (c *centralAuth) dbDumpPublicKey() (map[Node]string, error) {
|
||||||
// The keys will be written to a k/v store for persistence.
|
// The keys will be written to a k/v store for persistence.
|
||||||
type nodePublicKeys struct {
|
type nodePublicKeys struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
keyMap map[Node]string
|
KeyMap map[Node]string
|
||||||
}
|
}
|
||||||
|
|
||||||
// newNodePublicKeys will return a prepared type of nodePublicKeys.
|
// newNodePublicKeys will return a prepared type of nodePublicKeys.
|
||||||
func newNodePublicKeys(configuration *Configuration) *nodePublicKeys {
|
func newNodePublicKeys(configuration *Configuration) *nodePublicKeys {
|
||||||
n := nodePublicKeys{
|
n := nodePublicKeys{
|
||||||
keyMap: make(map[Node]string),
|
KeyMap: make(map[Node]string),
|
||||||
}
|
}
|
||||||
|
|
||||||
return &n
|
return &n
|
||||||
|
|
|
@ -41,6 +41,8 @@ type Configuration struct {
|
||||||
NatsReconnectJitter int
|
NatsReconnectJitter int
|
||||||
// NatsReconnectJitterTLS in seconds
|
// NatsReconnectJitterTLS in seconds
|
||||||
NatsReconnectJitterTLS int
|
NatsReconnectJitterTLS int
|
||||||
|
// PublicKeysGetInterval in seconds
|
||||||
|
PublicKeysGetInterval int
|
||||||
// The number of the profiling port
|
// The number of the profiling port
|
||||||
ProfilingPort string
|
ProfilingPort string
|
||||||
// host and port for prometheus listener, e.g. localhost:2112
|
// host and port for prometheus listener, e.g. localhost:2112
|
||||||
|
@ -87,8 +89,13 @@ type Configuration struct {
|
||||||
|
|
||||||
// Make the current node send hello messages to central at given interval in seconds
|
// Make the current node send hello messages to central at given interval in seconds
|
||||||
StartPubREQHello int
|
StartPubREQHello int
|
||||||
|
// Publisher for asking central for public keys
|
||||||
|
StartPubREQPublicKeysGet bool
|
||||||
|
// Subscriber for receiving reqests to get public keys registered on central
|
||||||
|
StartSubREQPublicKeysGet bool
|
||||||
|
// Subscriber for receiving updates of public keys from central
|
||||||
|
StartSubREQPublicKeysPut bool
|
||||||
// Start the central error logger.
|
// Start the central error logger.
|
||||||
// Takes a comma separated string of nodes to receive from or "*" for all nodes.
|
|
||||||
StartSubREQErrorLog bool
|
StartSubREQErrorLog bool
|
||||||
// Subscriber for hello messages
|
// Subscriber for hello messages
|
||||||
StartSubREQHello bool
|
StartSubREQHello bool
|
||||||
|
@ -139,6 +146,7 @@ type ConfigurationFromFile struct {
|
||||||
NatsConnectRetryInterval *int
|
NatsConnectRetryInterval *int
|
||||||
NatsReconnectJitter *int
|
NatsReconnectJitter *int
|
||||||
NatsReconnectJitterTLS *int
|
NatsReconnectJitterTLS *int
|
||||||
|
PublicKeysGetInterval *int
|
||||||
ProfilingPort *string
|
ProfilingPort *string
|
||||||
PromHostAndPort *string
|
PromHostAndPort *string
|
||||||
DefaultMessageTimeout *int
|
DefaultMessageTimeout *int
|
||||||
|
@ -161,6 +169,9 @@ type ConfigurationFromFile struct {
|
||||||
EnableDebug *bool
|
EnableDebug *bool
|
||||||
|
|
||||||
StartPubREQHello *int
|
StartPubREQHello *int
|
||||||
|
StartPubREQPublicKeysGet *bool
|
||||||
|
StartSubREQPublicKeysGet *bool
|
||||||
|
StartSubREQPublicKeysPut *bool
|
||||||
StartSubREQErrorLog *bool
|
StartSubREQErrorLog *bool
|
||||||
StartSubREQHello *bool
|
StartSubREQHello *bool
|
||||||
StartSubREQToFileAppend *bool
|
StartSubREQToFileAppend *bool
|
||||||
|
@ -200,6 +211,7 @@ func newConfigurationDefaults() Configuration {
|
||||||
NatsConnectRetryInterval: 10,
|
NatsConnectRetryInterval: 10,
|
||||||
NatsReconnectJitter: 100,
|
NatsReconnectJitter: 100,
|
||||||
NatsReconnectJitterTLS: 1,
|
NatsReconnectJitterTLS: 1,
|
||||||
|
PublicKeysGetInterval: 60,
|
||||||
ProfilingPort: "",
|
ProfilingPort: "",
|
||||||
PromHostAndPort: "",
|
PromHostAndPort: "",
|
||||||
DefaultMessageTimeout: 10,
|
DefaultMessageTimeout: 10,
|
||||||
|
@ -222,6 +234,9 @@ func newConfigurationDefaults() Configuration {
|
||||||
EnableDebug: false,
|
EnableDebug: false,
|
||||||
|
|
||||||
StartPubREQHello: 30,
|
StartPubREQHello: 30,
|
||||||
|
StartPubREQPublicKeysGet: true,
|
||||||
|
StartSubREQPublicKeysGet: false,
|
||||||
|
StartSubREQPublicKeysPut: true,
|
||||||
StartSubREQErrorLog: false,
|
StartSubREQErrorLog: false,
|
||||||
StartSubREQHello: true,
|
StartSubREQHello: true,
|
||||||
StartSubREQToFileAppend: true,
|
StartSubREQToFileAppend: true,
|
||||||
|
@ -308,6 +323,11 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
||||||
} else {
|
} else {
|
||||||
conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS
|
conf.NatsReconnectJitterTLS = *cf.NatsReconnectJitterTLS
|
||||||
}
|
}
|
||||||
|
if cf.PublicKeysGetInterval == nil {
|
||||||
|
conf.PublicKeysGetInterval = cd.PublicKeysGetInterval
|
||||||
|
} else {
|
||||||
|
conf.PublicKeysGetInterval = *cf.PublicKeysGetInterval
|
||||||
|
}
|
||||||
if cf.ProfilingPort == nil {
|
if cf.ProfilingPort == nil {
|
||||||
conf.ProfilingPort = cd.ProfilingPort
|
conf.ProfilingPort = cd.ProfilingPort
|
||||||
} else {
|
} else {
|
||||||
|
@ -416,6 +436,21 @@ func checkConfigValues(cf ConfigurationFromFile) Configuration {
|
||||||
} else {
|
} else {
|
||||||
conf.StartPubREQHello = *cf.StartPubREQHello
|
conf.StartPubREQHello = *cf.StartPubREQHello
|
||||||
}
|
}
|
||||||
|
if cf.StartPubREQPublicKeysGet == nil {
|
||||||
|
conf.StartPubREQPublicKeysGet = cd.StartPubREQPublicKeysGet
|
||||||
|
} else {
|
||||||
|
conf.StartPubREQPublicKeysGet = *cf.StartPubREQPublicKeysGet
|
||||||
|
}
|
||||||
|
if cf.StartSubREQPublicKeysGet == nil {
|
||||||
|
conf.StartSubREQPublicKeysGet = cd.StartSubREQPublicKeysGet
|
||||||
|
} else {
|
||||||
|
conf.StartSubREQPublicKeysGet = *cf.StartSubREQPublicKeysGet
|
||||||
|
}
|
||||||
|
if cf.StartSubREQPublicKeysPut == nil {
|
||||||
|
conf.StartSubREQPublicKeysPut = cd.StartSubREQPublicKeysPut
|
||||||
|
} else {
|
||||||
|
conf.StartSubREQPublicKeysPut = *cf.StartSubREQPublicKeysPut
|
||||||
|
}
|
||||||
if cf.StartSubREQErrorLog == nil {
|
if cf.StartSubREQErrorLog == nil {
|
||||||
conf.StartSubREQErrorLog = cd.StartSubREQErrorLog
|
conf.StartSubREQErrorLog = cd.StartSubREQErrorLog
|
||||||
} else {
|
} else {
|
||||||
|
@ -540,6 +575,7 @@ func (c *Configuration) CheckFlags() error {
|
||||||
flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.")
|
flag.IntVar(&c.NatsConnectRetryInterval, "natsConnectRetryInterval", fc.NatsConnectRetryInterval, "default nats retry connect interval in seconds.")
|
||||||
flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.")
|
flag.IntVar(&c.NatsReconnectJitter, "natsReconnectJitter", fc.NatsReconnectJitter, "default nats ReconnectJitter interval in milliseconds.")
|
||||||
flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.")
|
flag.IntVar(&c.NatsReconnectJitterTLS, "natsReconnectJitterTLS", fc.NatsReconnectJitterTLS, "default nats ReconnectJitterTLS interval in seconds.")
|
||||||
|
flag.IntVar(&c.PublicKeysGetInterval, "publicKeysGetInterval", fc.PublicKeysGetInterval, "default interval in seconds for asking the central for public keys")
|
||||||
flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port")
|
flag.StringVar(&c.ProfilingPort, "profilingPort", fc.ProfilingPort, "The number of the profiling port")
|
||||||
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112")
|
flag.StringVar(&c.PromHostAndPort, "promHostAndPort", fc.PromHostAndPort, "host and port for prometheus listener, e.g. localhost:2112")
|
||||||
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level")
|
flag.IntVar(&c.DefaultMessageTimeout, "defaultMessageTimeout", fc.DefaultMessageTimeout, "default message timeout in seconds. This can be overridden on the message level")
|
||||||
|
@ -563,6 +599,9 @@ func (c *Configuration) CheckFlags() error {
|
||||||
|
|
||||||
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")
|
flag.IntVar(&c.StartPubREQHello, "startPubREQHello", fc.StartPubREQHello, "Make the current node send hello messages to central at given interval in seconds")
|
||||||
|
|
||||||
|
flag.BoolVar(&c.StartPubREQPublicKeysGet, "startPubREQPublicKeysGet", fc.StartPubREQPublicKeysGet, "true/false")
|
||||||
|
flag.BoolVar(&c.StartSubREQPublicKeysGet, "startSubREQPublicKeysGet", fc.StartSubREQPublicKeysGet, "true/false")
|
||||||
|
flag.BoolVar(&c.StartSubREQPublicKeysPut, "startSubREQPublicKeysPut", fc.StartSubREQPublicKeysPut, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQErrorLog, "startSubREQErrorLog", fc.StartSubREQErrorLog, "true/false")
|
flag.BoolVar(&c.StartSubREQErrorLog, "startSubREQErrorLog", fc.StartSubREQErrorLog, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false")
|
flag.BoolVar(&c.StartSubREQHello, "startSubREQHello", fc.StartSubREQHello, "true/false")
|
||||||
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
|
flag.BoolVar(&c.StartSubREQToFileAppend, "startSubREQToFileAppend", fc.StartSubREQToFileAppend, "true/false")
|
||||||
|
|
1
doc/concept/auth/REQPublicKeysGet.drawio
Normal file
1
doc/concept/auth/REQPublicKeysGet.drawio
Normal file
|
@ -0,0 +1 @@
|
||||||
|
<mxfile host="Electron" modified="2022-04-07T03:56:08.840Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/16.5.1 Chrome/96.0.4664.110 Electron/16.0.7 Safari/537.36" etag="-LK1csLiiPyU1Qw41Pjy" version="16.5.1" type="device"><diagram name="Page-1" id="edf60f1a-56cd-e834-aa8a-f176f3a09ee4">5Vjfc9soEP5rNHP34I5+26+Jk7SdXO/Spp0mj1hCElMkXIRjK3/9LQJZQih1k8ZNr/figV12ge9bdtdygmW5e83RunjHUkwd3013TnDm+P4icuFXCholiD1fCXJOUiXyesE1ucdaqO3yDUlxbSwUjFFB1qYwYVWFE2HIEOdsay7LGDV3XaMcW4LrBFFb+pmkotBSz3V7xRtM8kKMLrxCyZecs02l96tYhZWmRJ0bvbQuUMq2A1Fw7gRLzphQo3K3xFSi2iGm7C4e0O6PzHElvsfg682pn324fH9/e7P41NCw8P7KZl6g3NwhutFY6NOKpgOnvR6WXlwnON0WRODrNUqkdgvRALJClBRmHgxTVBftWjnJWCU02b601XthLvDuwVt4e2wg2jArseANLNl1kRUpEx1pHbrbnjZ/oWXFgLFwroVIh0q+d91jBgMN22Mg9P9jEO6j+iUgdOf1RXJB08XZ23/ug0v05iRezuLDCOIqPZEPHWYJRXVNEhM0E+EHIPFdnBqJwMZoAEI0gUEn45giQe7M9DGFi97hihE4yZ6CyDMpiIMRtDXb8ARrq+GjHjnywwOOBOI5Fpajlqb9tZ/O3Pz/xpwFePxE5qwQGDt6Puam05bF3NVmRQmkHi4J4CzBNZzkxPFjChCerkAc53L04fx9uzS5xM1rLOwFtoTjNW3eYVGwtHXpDn1cbSZ8SDTcGkPBT+VNKoE5HPUbidU7nFifI3eOWPOCieQZT4Tf0cqPXX3+hu7MAgquLEw0asHZF7xklHGQtI0LlBtC6UiEKMkr+XixJAEEEkACvdOJVpQkTeU2k/Cbr/sI1cx3Q4ORyCZkio9xnnw2PuyGagk34ROx+9tSMg/cX4qS0KLkerOqE05WPyvXfWQXhAKxOpW/ZAaL/VEGCycy2HyCnnF9ejZ6IoseC/UfainGEY53RNzolXJ8K9e9ivTsbKfN2knTTeAJNzfDycBKTnuzdtbZTfYuk/2N6hsOF2zVFhyK9pdqj8JRffTdJ7ZHYTwK0+jntkf2X5J6mDN+l/5lKjdPJuejvf6pfxAKzHqNKgOt+OtGfiw5TVQ1lLmV56s/JNKOD7u7g9Gfjq5mrnz+swyVhDbKBhyhct0qgyCUEGN6h2X9tDSmk7pNItIFQL4b6dQppbJivJRUD9VbDaXUh+qcrZJiAZExg6smpMon7WWbMNMVX6p10TfUBIKo0u6HR2uV0HxUdQZOO/fyE5VesGU8NXcfmvdft2YjzP0o2mM9GvfIp6ReU6RRJxUlg40zypAYHqgjd6ryDl4YxJiKi678HqU09OXgdlAoDpQG71il4WDKj7/55uEoXmi98iMWgXk0/kb3xCIQLMxsFXxnEYAIQM1g2VouqB9x4IXxKRUGyuPIunPPsqzGj6xCMO0//arl/Zf14Pxf</diagram></mxfile>
|
1
doc/concept/auth/~$REQPublicKeysGet.drawio.dtmp
Normal file
1
doc/concept/auth/~$REQPublicKeysGet.drawio.dtmp
Normal file
|
@ -0,0 +1 @@
|
||||||
|
<mxfile host="Electron" modified="2022-04-07T04:03:16.604Z" agent="5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) draw.io/16.5.1 Chrome/96.0.4664.110 Electron/16.0.7 Safari/537.36" etag="sL9VRzqSYFqQD3cE1OKV" version="16.5.1" type="device"><diagram name="Page-1" id="edf60f1a-56cd-e834-aa8a-f176f3a09ee4">5Vjfc9soEP5rNNN7cEe/7dfESdpO2ru06c0lj1hCElMkXIRjK3/9LQJZQih1ksaX3vTFA7vLAt+37K7lBMty946jdfGJpZg6vpvunODM8f1F5MKvFDRKEHu+EuScpErk9YJrco+1UK/LNyTFtWEoGKOCrE1hwqoKJ8KQIc7Z1jTLGDV3XaMcW4LrBFFb+g9JRaGlnuv2iveY5IUYXXiFkm85Z5tK71exCitNiTo32rQuUMq2A1Fw7gRLzphQo3K3xFSi2iGm1l08oN0fmeNKPGbB95tTP/ty+fn+9mbxd0PDwvuYzbxAublDdKOx0KcVTQdOez0svbhOcLotiMDXa5RI7RaiAWSFKCnMPBimqC5aWznJWCU02b5cq/fCXODdg7fw9thAtGFWYsEbMNl1kRWpJTrSOnS3PW3+QsuKAWPhXAuRDpV877rHDAYatqdA6P/PINxH9WtA6M7ri+SCpouzD3/dB5fo/Um8nMWHEcRVeiIfOswSiuqaJCZoJsIPQOK7ODUSgY3RAIRoAoNOxjFFgtyZ6WMKF73DFSNwkj0FkWdSEAcjaGu24QnWq4aPeuTIDw84EojnWFiOWpr2134+c/PfjTkL8PiZzFkhMHb0csxNpy2LuavNihJIPVwSwFmCazjJiePHFCA8XYE4zuXoy/nn1jS5xM07LGwDeRO3xlCsU3mKSmAO29h2toTjNW0+YVGwtN3aHe51tRE/SKze4cT6ErlzxJoXTCTPeCL8jlZ+7OrzJ3RnFlBwZWGiUQvOvuElo4yDpG1coNwQSkciREleyceLJZEgkAAS6J1OtKIkaSq3mYTffN1HqGa+GxqMRDYhU3yM8+SL8WE3VEu4CZfx/7tQMg/cX4qS0KLkerOqE05WP5/rHpfDvrILQoHY6vUzWOyPMlg4kcHmE/SM69OL0RNZ9Fio/1RLMY5wvCPiRlvK8a20exvp2dlOL2snTTeBJ9zcDCeDVXLaL2tn3brJ3mWyv1F9w+GCrdqCQ9H+Wu1ROKqPvvvM9iiMR2Ea/bftkf2XpB7mDKsveWwP9Iv1L1O5eTI5H+31T/2DUGDWa1QZaMXfN/JjyWmiqqHMrTxfvZFIOz7s7g5Gfzi6mrny+c8yVBLaqDXgCJXrVhkEoYQY0zss66elMZ3UbRKRLgDy3UinTimVFeOlpHqo3moopT5U52yVFAuIjBlcNSFVPrletgkzXfGlWhd9Q00giCrtfni0VgnNR1Vn4LRzLz9RaYMt46m5+3B5/3VrNsLcj6I91qNxj3xK6jVFGnVSUTLYOKMMieGBOnKnKu/ghUGMqbjoyu9RSkNfDm4HheJAafCOVRoOpvz4h28ejuKF1is/YhGYR+NvdM8sAsHCzFbBI4sARABqBmZraVA/4cAL41MqDJTH0erOPcuyGj+xCsG0//SrzPsv68H5vw==</diagram></mxfile>
|
|
@ -101,6 +101,8 @@ type process struct {
|
||||||
startup *startup
|
startup *startup
|
||||||
// Signatures
|
// Signatures
|
||||||
signatures *signatures
|
signatures *signatures
|
||||||
|
// centralAuth
|
||||||
|
centralAuth *centralAuth
|
||||||
// errorKernel
|
// errorKernel
|
||||||
errorKernel *errorKernel
|
errorKernel *errorKernel
|
||||||
// metrics
|
// metrics
|
||||||
|
@ -133,6 +135,7 @@ func newProcess(ctx context.Context, server *server, subject Subject, processKin
|
||||||
ctxCancel: cancel,
|
ctxCancel: cancel,
|
||||||
startup: newStartup(server),
|
startup: newStartup(server),
|
||||||
signatures: server.signatures,
|
signatures: server.signatures,
|
||||||
|
centralAuth: server.centralAuth,
|
||||||
errorKernel: server.errorKernel,
|
errorKernel: server.errorKernel,
|
||||||
metrics: server.metrics,
|
metrics: server.metrics,
|
||||||
}
|
}
|
||||||
|
|
89
processes.go
89
processes.go
|
@ -114,62 +114,50 @@ func (p *processes) Start(proc process) {
|
||||||
go proc.spawnWorker()
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for textLogging messages
|
|
||||||
if proc.configuration.StartSubREQToFileAppend {
|
if proc.configuration.StartSubREQToFileAppend {
|
||||||
proc.startup.subREQToFileAppend(proc)
|
proc.startup.subREQToFileAppend(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for text to file messages
|
|
||||||
if proc.configuration.StartSubREQToFile {
|
if proc.configuration.StartSubREQToFile {
|
||||||
proc.startup.subREQToFile(proc)
|
proc.startup.subREQToFile(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for text to file messages
|
|
||||||
if proc.configuration.StartSubREQToFileNACK {
|
if proc.configuration.StartSubREQToFileNACK {
|
||||||
proc.startup.subREQToFileNACK(proc)
|
proc.startup.subREQToFileNACK(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for reading file to copy
|
|
||||||
if proc.configuration.StartSubREQCopyFileFrom {
|
if proc.configuration.StartSubREQCopyFileFrom {
|
||||||
proc.startup.subREQCopyFileFrom(proc)
|
proc.startup.subREQCopyFileFrom(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for writing copied file to disk
|
|
||||||
if proc.configuration.StartSubREQCopyFileTo {
|
if proc.configuration.StartSubREQCopyFileTo {
|
||||||
proc.startup.subREQCopyFileTo(proc)
|
proc.startup.subREQCopyFileTo(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for Hello messages
|
|
||||||
if proc.configuration.StartSubREQHello {
|
if proc.configuration.StartSubREQHello {
|
||||||
proc.startup.subREQHello(proc)
|
proc.startup.subREQHello(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubREQErrorLog {
|
if proc.configuration.StartSubREQErrorLog {
|
||||||
// Start a subscriber for REQErrorLog messages
|
|
||||||
proc.startup.subREQErrorLog(proc)
|
proc.startup.subREQErrorLog(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for Ping Request messages
|
|
||||||
if proc.configuration.StartSubREQPing {
|
if proc.configuration.StartSubREQPing {
|
||||||
proc.startup.subREQPing(proc)
|
proc.startup.subREQPing(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for REQPong messages
|
|
||||||
if proc.configuration.StartSubREQPong {
|
if proc.configuration.StartSubREQPong {
|
||||||
proc.startup.subREQPong(proc)
|
proc.startup.subREQPong(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for REQCliCommand messages
|
|
||||||
if proc.configuration.StartSubREQCliCommand {
|
if proc.configuration.StartSubREQCliCommand {
|
||||||
proc.startup.subREQCliCommand(proc)
|
proc.startup.subREQCliCommand(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for CLICommandReply messages
|
|
||||||
if proc.configuration.StartSubREQToConsole {
|
if proc.configuration.StartSubREQToConsole {
|
||||||
proc.startup.subREQToConsole(proc)
|
proc.startup.subREQToConsole(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for CLICommandReply messages
|
|
||||||
if proc.configuration.EnableTUI {
|
if proc.configuration.EnableTUI {
|
||||||
proc.startup.subREQTuiToConsole(proc)
|
proc.startup.subREQTuiToConsole(proc)
|
||||||
}
|
}
|
||||||
|
@ -178,7 +166,18 @@ func (p *processes) Start(proc process) {
|
||||||
proc.startup.pubREQHello(proc)
|
proc.startup.pubREQHello(proc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a subscriber for Http Get Requests
|
if proc.configuration.StartPubREQPublicKeysGet {
|
||||||
|
proc.startup.pubREQPublicKeysGet(proc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if proc.configuration.StartSubREQPublicKeysGet {
|
||||||
|
proc.startup.subREQPublicKeysGet(proc)
|
||||||
|
}
|
||||||
|
|
||||||
|
if proc.configuration.StartSubREQPublicKeysPut {
|
||||||
|
proc.startup.subREQPublicKeysPut(proc)
|
||||||
|
}
|
||||||
|
|
||||||
if proc.configuration.StartSubREQHttpGet {
|
if proc.configuration.StartSubREQHttpGet {
|
||||||
proc.startup.subREQHttpGet(proc)
|
proc.startup.subREQHttpGet(proc)
|
||||||
}
|
}
|
||||||
|
@ -300,6 +299,68 @@ func (s startup) pubREQHello(p process) {
|
||||||
go proc.spawnWorker()
|
go proc.spawnWorker()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pubREQPublicKeysGet defines the startup of a publisher that will send REQPublicKeysGet
|
||||||
|
// to central server and ask for publics keys, and to get them deliver back with a request
|
||||||
|
// of type pubREQPublicKeysPut.
|
||||||
|
func (s startup) pubREQPublicKeysGet(p process) {
|
||||||
|
log.Printf("Starting PublicKeysGet Publisher: %#v\n", p.node)
|
||||||
|
|
||||||
|
sub := newSubject(REQPublicKeysGet, p.configuration.CentralNodeName)
|
||||||
|
proc := newProcess(p.ctx, s.server, sub, processKindPublisher, nil)
|
||||||
|
|
||||||
|
// Define the procFunc to be used for the process.
|
||||||
|
proc.procFunc = func(ctx context.Context, procFuncCh chan Message) error {
|
||||||
|
// TODO: replace this with a separate timer for the request type.
|
||||||
|
ticker := time.NewTicker(time.Second * time.Duration(p.configuration.PublicKeysGetInterval))
|
||||||
|
for {
|
||||||
|
|
||||||
|
m := Message{
|
||||||
|
FileName: "publickeysget.log",
|
||||||
|
Directory: "publickeysget",
|
||||||
|
ToNode: Node(p.configuration.CentralNodeName),
|
||||||
|
FromNode: Node(p.node),
|
||||||
|
// Data: []byte(d),
|
||||||
|
Method: REQPublicKeysGet,
|
||||||
|
ReplyMethod: REQPublicKeysPut,
|
||||||
|
ACKTimeout: proc.configuration.DefaultMessageTimeout,
|
||||||
|
Retries: 1,
|
||||||
|
}
|
||||||
|
|
||||||
|
sam, err := newSubjectAndMessage(m)
|
||||||
|
if err != nil {
|
||||||
|
// In theory the system should drop the message before it reaches here.
|
||||||
|
p.errorKernel.errSend(p, m, err)
|
||||||
|
log.Printf("error: ProcessesStart: %v\n", err)
|
||||||
|
}
|
||||||
|
proc.toRingbufferCh <- []subjectAndMessage{sam}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ticker.C:
|
||||||
|
case <-ctx.Done():
|
||||||
|
er := fmt.Errorf("info: stopped handleFunc for: publisher %v", proc.subject.name())
|
||||||
|
// sendErrorLogMessage(proc.toRingbufferCh, proc.node, er)
|
||||||
|
log.Printf("%v\n", er)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go proc.spawnWorker()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s startup) subREQPublicKeysGet(p process) {
|
||||||
|
log.Printf("Starting Public keys get subscriber: %#v\n", p.node)
|
||||||
|
sub := newSubject(REQPublicKeysGet, string(p.node))
|
||||||
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
go proc.spawnWorker()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s startup) subREQPublicKeysPut(p process) {
|
||||||
|
log.Printf("Starting Public keys put subscriber: %#v\n", p.node)
|
||||||
|
sub := newSubject(REQPublicKeysPut, string(p.node))
|
||||||
|
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||||
|
go proc.spawnWorker()
|
||||||
|
}
|
||||||
|
|
||||||
func (s startup) subREQToConsole(p process) {
|
func (s startup) subREQToConsole(p process) {
|
||||||
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
log.Printf("Starting Text To Console subscriber: %#v\n", p.node)
|
||||||
sub := newSubject(REQToConsole, string(p.node))
|
sub := newSubject(REQToConsole, string(p.node))
|
||||||
|
@ -377,7 +438,7 @@ func (s startup) subREQHello(p process) {
|
||||||
s.centralAuth.addPublicKey(proc, m)
|
s.centralAuth.addPublicKey(proc, m)
|
||||||
|
|
||||||
// update the prometheus metrics
|
// update the prometheus metrics
|
||||||
s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.keyMap)))
|
s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.KeyMap)))
|
||||||
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
123
requests.go
123
requests.go
|
@ -36,6 +36,7 @@ import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -133,8 +134,12 @@ const (
|
||||||
REQRelayInitial Method = "REQRelayInitial"
|
REQRelayInitial Method = "REQRelayInitial"
|
||||||
// REQNone is used when there should be no reply.
|
// REQNone is used when there should be no reply.
|
||||||
REQNone Method = "REQNone"
|
REQNone Method = "REQNone"
|
||||||
// REQPublicKey will get the public ed25519 certificate from a node.
|
// REQPublicKey will get the public ed25519 key from a node.
|
||||||
REQPublicKey Method = "REQPublicKey"
|
REQPublicKey Method = "REQPublicKey"
|
||||||
|
// REQPublicKeysGet will get all the public keys from central.
|
||||||
|
REQPublicKeysGet Method = "REQPublicKeysGet"
|
||||||
|
// REQPublicKeysPut will put all the public received from central.
|
||||||
|
REQPublicKeysPut Method = "REQPublicKeysPut"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The mapping of all the method constants specified, what type
|
// The mapping of all the method constants specified, what type
|
||||||
|
@ -219,6 +224,12 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
||||||
REQPublicKey: methodREQPublicKey{
|
REQPublicKey: methodREQPublicKey{
|
||||||
event: EventACK,
|
event: EventACK,
|
||||||
},
|
},
|
||||||
|
REQPublicKeysGet: methodREQPublicKeysGet{
|
||||||
|
event: EventNACK,
|
||||||
|
},
|
||||||
|
REQPublicKeysPut: methodREQPublicKeysPut{
|
||||||
|
event: EventNACK,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -332,6 +343,7 @@ func newReplyMessage(proc process, message Message, outData []byte) {
|
||||||
|
|
||||||
// Create a new message for the reply, and put it on the
|
// Create a new message for the reply, and put it on the
|
||||||
// ringbuffer to be published.
|
// ringbuffer to be published.
|
||||||
|
// TODO: Check that we still got all the fields present that are needed here.
|
||||||
newMsg := Message{
|
newMsg := Message{
|
||||||
ToNode: message.FromNode,
|
ToNode: message.FromNode,
|
||||||
FromNode: message.ToNode,
|
FromNode: message.ToNode,
|
||||||
|
@ -2025,6 +2037,115 @@ func (m methodREQPublicKey) handler(proc process, message Message, node string)
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
|
type methodREQPublicKeysGet struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPublicKeysGet) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to get all the public ed25519 keys from a central server.
|
||||||
|
func (m methodREQPublicKeysGet) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// - Since this is implemented as a NACK message we could implement a
|
||||||
|
// metric thats shows the last time a node did a key request.
|
||||||
|
// - We could also implement a metrics on the receiver showing the last
|
||||||
|
// time a node had done an update.
|
||||||
|
|
||||||
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// TODO: Should we receive a hash of he current keys from the node here
|
||||||
|
// to verify if we need to update or not ?
|
||||||
|
case outCh <- []byte{}:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// case out := <-outCh:
|
||||||
|
case <-outCh:
|
||||||
|
b, err := json.Marshal(proc.centralAuth.nodePublicKeys.KeyMap)
|
||||||
|
if err != nil {
|
||||||
|
er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err)
|
||||||
|
proc.errorKernel.errSend(proc, message, er)
|
||||||
|
}
|
||||||
|
|
||||||
|
newReplyMessage(proc, message, b)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// NB: We're not sending an ACK message for this request type.
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
type methodREQPublicKeysPut struct {
|
||||||
|
event Event
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m methodREQPublicKeysPut) getKind() Event {
|
||||||
|
return m.event
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handler to put the public key replies received from a central server.
|
||||||
|
func (m methodREQPublicKeysPut) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
// Get a context with the timeout specified in message.MethodTimeout.
|
||||||
|
|
||||||
|
// TODO:
|
||||||
|
// - Since this is implemented as a NACK message we could implement a
|
||||||
|
// metric thats shows the last time keys were updated.
|
||||||
|
|
||||||
|
// TODO: Define a subscriber for this Request type in startups.
|
||||||
|
|
||||||
|
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||||
|
|
||||||
|
proc.processes.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer proc.processes.wg.Done()
|
||||||
|
outCh := make(chan []byte)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
// TODO: Should we receive a hash of he current keys from the node her ?
|
||||||
|
case outCh <- []byte{}:
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
case <-outCh:
|
||||||
|
keys := make(map[Node]string)
|
||||||
|
json.Unmarshal(message.Data, &keys)
|
||||||
|
|
||||||
|
fmt.Printf(" *** RECEIVED KEYS: %v\n", keys)
|
||||||
|
|
||||||
|
// Prepare and queue for sending a new message with the output
|
||||||
|
// of the action executed.
|
||||||
|
// newReplyMessage(proc, message, out)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Send back an ACK message.
|
||||||
|
// ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
// ---- Template that can be used for creating request methods
|
// ---- Template that can be used for creating request methods
|
||||||
|
|
||||||
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue