mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
added initial public keys get
This commit is contained in:
parent
ef4d921772
commit
c48c7bf196
3 changed files with 163 additions and 30 deletions
|
@ -18,6 +18,7 @@ type argsString string
|
|||
type centralAuth struct {
|
||||
// schema map[Node]map[argsString]signatureBase32
|
||||
nodePublicKeys *nodePublicKeys
|
||||
nodeNotAckedPublicKeys *nodeNotAckedPublicKeys
|
||||
configuration *Configuration
|
||||
db *bolt.DB
|
||||
bucketNamePublicKeys string
|
||||
|
@ -29,6 +30,7 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen
|
|||
c := centralAuth{
|
||||
// schema: make(map[Node]map[argsString]signatureBase32),
|
||||
nodePublicKeys: newNodePublicKeys(configuration),
|
||||
nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration),
|
||||
configuration: configuration,
|
||||
bucketNamePublicKeys: "publicKeys",
|
||||
errorKernel: errorKernel,
|
||||
|
@ -64,7 +66,6 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen
|
|||
|
||||
// addPublicKey to the db if the node do not exist, or if it is a new value.
|
||||
func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
||||
c.nodePublicKeys.mu.Lock()
|
||||
|
||||
// TODO: When receiviving a new or different keys for a node we should
|
||||
// have a service with it's own storage for these keys, and an operator
|
||||
|
@ -77,31 +78,54 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
|||
// key for a host.
|
||||
|
||||
// Check if a key for the current node already exists in the map.
|
||||
c.nodePublicKeys.mu.Lock()
|
||||
existingKey, ok := c.nodePublicKeys.KeyMap[msg.FromNode]
|
||||
c.nodePublicKeys.mu.Unlock()
|
||||
|
||||
if ok && bytes.Equal(existingKey, msg.Data) {
|
||||
fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode)
|
||||
c.nodePublicKeys.mu.Unlock()
|
||||
fmt.Printf(" * \nkey value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode)
|
||||
return
|
||||
}
|
||||
|
||||
// New key
|
||||
c.nodePublicKeys.KeyMap[msg.FromNode] = msg.Data
|
||||
c.nodePublicKeys.mu.Unlock()
|
||||
c.nodeNotAckedPublicKeys.mu.Lock()
|
||||
existingNotAckedKey, ok := c.nodeNotAckedPublicKeys.KeyMap[msg.FromNode]
|
||||
// We only want to send one notification to the error kernel about new key detection,
|
||||
// so we check if the values are the same as the one we already got before we continue
|
||||
// with registering and logging for the the new key.
|
||||
if ok && bytes.Equal(existingNotAckedKey, msg.Data) {
|
||||
fmt.Printf(" * \nkey value for NOT-REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode)
|
||||
c.nodeNotAckedPublicKeys.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// Add key to persistent storage.
|
||||
c.dbUpdatePublicKey(string(msg.FromNode), msg.Data)
|
||||
c.nodeNotAckedPublicKeys.KeyMap[msg.FromNode] = msg.Data
|
||||
c.nodeNotAckedPublicKeys.mu.Unlock()
|
||||
|
||||
if ok {
|
||||
er := fmt.Errorf("info: updated with new public key for node: %v", msg.FromNode)
|
||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||
fmt.Printf(" * %v\n", er)
|
||||
c.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
if !ok {
|
||||
er := fmt.Errorf("info: added public key for new node: %v", msg.FromNode)
|
||||
fmt.Printf(" * %v\n", er)
|
||||
c.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
|
||||
// TODO: The below commented code should put used within the REQ handler instead to
|
||||
// store the real keys into the allowed public keys map.
|
||||
// Here we should only add new keys to the NotAcked map.
|
||||
|
||||
// // New key
|
||||
// c.nodePublicKeys.KeyMap[msg.FromNode] = msg.Data
|
||||
// c.nodePublicKeys.mu.Unlock()
|
||||
//
|
||||
// // Add key to persistent storage.
|
||||
// c.dbUpdatePublicKey(string(msg.FromNode), msg.Data)
|
||||
//
|
||||
// if ok {
|
||||
// er := fmt.Errorf("info: updated with new public key for node: %v", msg.FromNode)
|
||||
// fmt.Printf(" * %v\n", er)
|
||||
// c.errorKernel.infoSend(proc, msg, er)
|
||||
// }
|
||||
// if !ok {
|
||||
// er := fmt.Errorf("info: added public key for new node: %v", msg.FromNode)
|
||||
// fmt.Printf(" * %v\n", er)
|
||||
// c.errorKernel.infoSend(proc, msg, er)
|
||||
// }
|
||||
|
||||
//c.dbDump(c.bucketPublicKeys)
|
||||
}
|
||||
|
@ -212,3 +236,21 @@ func newNodePublicKeys(configuration *Configuration) *nodePublicKeys {
|
|||
|
||||
return &n
|
||||
}
|
||||
|
||||
// --- HERE
|
||||
|
||||
// nodeNotAckedPublicKeys holds all the gathered but not acknowledged public
|
||||
// keys of nodes in the system.
|
||||
type nodeNotAckedPublicKeys struct {
|
||||
mu sync.RWMutex
|
||||
KeyMap map[Node][]byte
|
||||
}
|
||||
|
||||
// newNodeNotAckedPublicKeys will return a prepared type of nodePublicKeys.
|
||||
func newNodeNotAckedPublicKeys(configuration *Configuration) *nodeNotAckedPublicKeys {
|
||||
n := nodeNotAckedPublicKeys{
|
||||
KeyMap: make(map[Node][]byte),
|
||||
}
|
||||
|
||||
return &n
|
||||
}
|
||||
|
|
14
processes.go
14
processes.go
|
@ -172,6 +172,7 @@ func (p *processes) Start(proc process) {
|
|||
|
||||
if proc.configuration.IsCentralAuth {
|
||||
proc.startup.subREQPublicKeysGet(proc)
|
||||
proc.startup.subREQPublicKeysAllow(proc)
|
||||
}
|
||||
|
||||
if proc.configuration.StartSubREQPublicKeysToNode {
|
||||
|
@ -352,6 +353,13 @@ func (s startup) subREQPublicKeysGet(p process) {
|
|||
go proc.spawnWorker()
|
||||
}
|
||||
|
||||
func (s startup) subREQPublicKeysAllow(p process) {
|
||||
log.Printf("Starting Public keys allow subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQPublicKeysAllow, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
}
|
||||
|
||||
func (s startup) subREQPublicKeysToNode(p process) {
|
||||
log.Printf("Starting Public keys to Node subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQPublicKeysToNode, string(p.node))
|
||||
|
@ -436,7 +444,11 @@ func (s startup) subREQHello(p process) {
|
|||
s.centralAuth.addPublicKey(proc, m)
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.KeyMap)))
|
||||
|
||||
s.server.centralAuth.nodePublicKeys.mu.Lock()
|
||||
mapLen := len(s.server.centralAuth.nodePublicKeys.KeyMap)
|
||||
s.server.centralAuth.nodePublicKeys.mu.Unlock()
|
||||
s.metrics.promHelloNodesTotal.Set(float64(mapLen))
|
||||
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()
|
||||
|
||||
}
|
||||
|
|
81
requests.go
81
requests.go
|
@ -138,6 +138,8 @@ const (
|
|||
REQPublicKeysGet Method = "REQPublicKeysGet"
|
||||
// REQPublicKeysToNode will put all the public received from central.
|
||||
REQPublicKeysToNode Method = "REQPublicKeysToNode"
|
||||
// REQAuthPublicKeysAllow
|
||||
REQPublicKeysAllow Method = "REQPublicKeysAllow"
|
||||
)
|
||||
|
||||
// The mapping of all the method constants specified, what type
|
||||
|
@ -225,6 +227,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
REQPublicKeysToNode: methodREQPublicKeysToNode{
|
||||
event: EventNACK,
|
||||
},
|
||||
REQPublicKeysAllow: methodREQPublicKeysAllow{
|
||||
event: EventACK,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -2035,6 +2040,9 @@ func (m methodREQPublicKeysGet) handler(proc process, message Message, node stri
|
|||
outCh := make(chan []byte)
|
||||
|
||||
go func() {
|
||||
// Normally we would do some logic here, where the result is passed to outCh when done.
|
||||
// In this case this go func and the below select is not needed, but keeping it so the
|
||||
// structure is the same as the other handlers.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// TODO: Should we receive a hash of he current keys from the node here
|
||||
|
@ -2047,7 +2055,9 @@ func (m methodREQPublicKeysGet) handler(proc process, message Message, node stri
|
|||
case <-ctx.Done():
|
||||
// case out := <-outCh:
|
||||
case <-outCh:
|
||||
proc.centralAuth.nodePublicKeys.mu.Lock()
|
||||
b, err := json.Marshal(proc.centralAuth.nodePublicKeys.KeyMap)
|
||||
proc.centralAuth.nodePublicKeys.mu.Unlock()
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
|
@ -2087,6 +2097,9 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s
|
|||
outCh := make(chan []byte)
|
||||
|
||||
go func() {
|
||||
// Normally we would do some logic here, where the result is passed to outCh when done.
|
||||
// In this case this go func and the below select is not needed, but keeping it so the
|
||||
// structure is the same as the other handlers.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// TODO: Should we receive a hash of he current keys from the node here ?
|
||||
|
@ -2099,7 +2112,11 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s
|
|||
case <-ctx.Done():
|
||||
case <-outCh:
|
||||
keys := make(map[Node]string)
|
||||
json.Unmarshal(message.Data, &keys)
|
||||
err := json.Unmarshal(message.Data, &keys)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQPublicKeysToNode : json unmarshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
}
|
||||
|
||||
fmt.Printf(" *** RECEIVED KEYS: %v\n", keys)
|
||||
|
||||
|
@ -2123,6 +2140,68 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s
|
|||
|
||||
// ----
|
||||
|
||||
type methodREQPublicKeysAllow struct {
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQPublicKeysAllow) getKind() Event {
|
||||
return m.event
|
||||
}
|
||||
|
||||
// Handler to get all the public ed25519 keys from a central server.
|
||||
func (m methodREQPublicKeysAllow) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
// Get a context with the timeout specified in message.MethodTimeout.
|
||||
ctx, _ := getContextForMethodTimeout(proc.ctx, message)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
outCh := make(chan []byte)
|
||||
|
||||
go func() {
|
||||
// Normally we would do some logic here, where the result is passed to outCh when done.
|
||||
// In this case this go func and the below select is not needed, but keeping it so the
|
||||
// structure is the same as the other handlers.
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case outCh <- []byte{}:
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-outCh:
|
||||
proc.centralAuth.nodeNotAckedPublicKeys.mu.Lock()
|
||||
defer proc.centralAuth.nodeNotAckedPublicKeys.mu.Unlock()
|
||||
|
||||
for _, n := range message.MethodArgs {
|
||||
key, ok := proc.centralAuth.nodeNotAckedPublicKeys.KeyMap[Node(n)]
|
||||
if ok {
|
||||
// Store/update the node and public key on the allowed pubKey map.
|
||||
proc.centralAuth.nodePublicKeys.mu.Lock()
|
||||
proc.centralAuth.nodePublicKeys.KeyMap[Node(n)] = key
|
||||
proc.centralAuth.nodePublicKeys.mu.Unlock()
|
||||
|
||||
// Add key to persistent storage.
|
||||
proc.centralAuth.dbUpdatePublicKey(string(n), key)
|
||||
|
||||
// Delete the key from the NotAcked map
|
||||
delete(proc.centralAuth.nodeNotAckedPublicKeys.KeyMap, Node(n))
|
||||
|
||||
er := fmt.Errorf("info: REQPublicKeysAllow : allowed new/updated public key for %v to allowed public key map", n)
|
||||
proc.errorKernel.infoSend(proc, message, er)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
// ---- Template that can be used for creating request methods
|
||||
|
||||
// func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
|
|
Loading…
Reference in a new issue