From c48c7bf19620ba62467a9e7148004f2178c5e328 Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 20 Apr 2022 18:33:52 +0200 Subject: [PATCH] added initial public keys get --- central_auth.go | 98 +++++++++++++++++++++++++++++++++++-------------- processes.go | 14 ++++++- requests.go | 81 +++++++++++++++++++++++++++++++++++++++- 3 files changed, 163 insertions(+), 30 deletions(-) diff --git a/central_auth.go b/central_auth.go index 01e08f1..c338141 100644 --- a/central_auth.go +++ b/central_auth.go @@ -17,21 +17,23 @@ type argsString string // centralAuth holds the logic related to handling public keys and auth maps. type centralAuth struct { // schema map[Node]map[argsString]signatureBase32 - nodePublicKeys *nodePublicKeys - configuration *Configuration - db *bolt.DB - bucketNamePublicKeys string - errorKernel *errorKernel + nodePublicKeys *nodePublicKeys + nodeNotAckedPublicKeys *nodeNotAckedPublicKeys + configuration *Configuration + db *bolt.DB + bucketNamePublicKeys string + errorKernel *errorKernel } // newCentralAuth will return a prepared *centralAuth with input values set. func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *centralAuth { c := centralAuth{ // schema: make(map[Node]map[argsString]signatureBase32), - nodePublicKeys: newNodePublicKeys(configuration), - configuration: configuration, - bucketNamePublicKeys: "publicKeys", - errorKernel: errorKernel, + nodePublicKeys: newNodePublicKeys(configuration), + nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration), + configuration: configuration, + bucketNamePublicKeys: "publicKeys", + errorKernel: errorKernel, } databaseFilepath := filepath.Join(configuration.DatabaseFolder, "auth.db") @@ -64,7 +66,6 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen // addPublicKey to the db if the node do not exist, or if it is a new value. func (c *centralAuth) addPublicKey(proc process, msg Message) { - c.nodePublicKeys.mu.Lock() // TODO: When receiviving a new or different keys for a node we should // have a service with it's own storage for these keys, and an operator @@ -77,32 +78,55 @@ func (c *centralAuth) addPublicKey(proc process, msg Message) { // key for a host. // Check if a key for the current node already exists in the map. + c.nodePublicKeys.mu.Lock() existingKey, ok := c.nodePublicKeys.KeyMap[msg.FromNode] + c.nodePublicKeys.mu.Unlock() if ok && bytes.Equal(existingKey, msg.Data) { - fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode) - c.nodePublicKeys.mu.Unlock() + fmt.Printf(" * \nkey value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode) return } - // New key - c.nodePublicKeys.KeyMap[msg.FromNode] = msg.Data - c.nodePublicKeys.mu.Unlock() - - // Add key to persistent storage. - c.dbUpdatePublicKey(string(msg.FromNode), msg.Data) - - if ok { - er := fmt.Errorf("info: updated with new public key for node: %v", msg.FromNode) - fmt.Printf(" * %v\n", er) - c.errorKernel.infoSend(proc, msg, er) - } - if !ok { - er := fmt.Errorf("info: added public key for new node: %v", msg.FromNode) - fmt.Printf(" * %v\n", er) - c.errorKernel.infoSend(proc, msg, er) + c.nodeNotAckedPublicKeys.mu.Lock() + existingNotAckedKey, ok := c.nodeNotAckedPublicKeys.KeyMap[msg.FromNode] + // We only want to send one notification to the error kernel about new key detection, + // so we check if the values are the same as the one we already got before we continue + // with registering and logging for the the new key. + if ok && bytes.Equal(existingNotAckedKey, msg.Data) { + fmt.Printf(" * \nkey value for NOT-REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode) + c.nodeNotAckedPublicKeys.mu.Unlock() + return } + c.nodeNotAckedPublicKeys.KeyMap[msg.FromNode] = msg.Data + c.nodeNotAckedPublicKeys.mu.Unlock() + + er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode) + fmt.Printf(" * %v\n", er) + c.errorKernel.infoSend(proc, msg, er) + + // TODO: The below commented code should put used within the REQ handler instead to + // store the real keys into the allowed public keys map. + // Here we should only add new keys to the NotAcked map. + + // // New key + // c.nodePublicKeys.KeyMap[msg.FromNode] = msg.Data + // c.nodePublicKeys.mu.Unlock() + // + // // Add key to persistent storage. + // c.dbUpdatePublicKey(string(msg.FromNode), msg.Data) + // + // if ok { + // er := fmt.Errorf("info: updated with new public key for node: %v", msg.FromNode) + // fmt.Printf(" * %v\n", er) + // c.errorKernel.infoSend(proc, msg, er) + // } + // if !ok { + // er := fmt.Errorf("info: added public key for new node: %v", msg.FromNode) + // fmt.Printf(" * %v\n", er) + // c.errorKernel.infoSend(proc, msg, er) + // } + //c.dbDump(c.bucketPublicKeys) } @@ -212,3 +236,21 @@ func newNodePublicKeys(configuration *Configuration) *nodePublicKeys { return &n } + +// --- HERE + +// nodeNotAckedPublicKeys holds all the gathered but not acknowledged public +// keys of nodes in the system. +type nodeNotAckedPublicKeys struct { + mu sync.RWMutex + KeyMap map[Node][]byte +} + +// newNodeNotAckedPublicKeys will return a prepared type of nodePublicKeys. +func newNodeNotAckedPublicKeys(configuration *Configuration) *nodeNotAckedPublicKeys { + n := nodeNotAckedPublicKeys{ + KeyMap: make(map[Node][]byte), + } + + return &n +} diff --git a/processes.go b/processes.go index 2b212c3..6d5ee87 100644 --- a/processes.go +++ b/processes.go @@ -172,6 +172,7 @@ func (p *processes) Start(proc process) { if proc.configuration.IsCentralAuth { proc.startup.subREQPublicKeysGet(proc) + proc.startup.subREQPublicKeysAllow(proc) } if proc.configuration.StartSubREQPublicKeysToNode { @@ -352,6 +353,13 @@ func (s startup) subREQPublicKeysGet(p process) { go proc.spawnWorker() } +func (s startup) subREQPublicKeysAllow(p process) { + log.Printf("Starting Public keys allow subscriber: %#v\n", p.node) + sub := newSubject(REQPublicKeysAllow, string(p.node)) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() +} + func (s startup) subREQPublicKeysToNode(p process) { log.Printf("Starting Public keys to Node subscriber: %#v\n", p.node) sub := newSubject(REQPublicKeysToNode, string(p.node)) @@ -436,7 +444,11 @@ func (s startup) subREQHello(p process) { s.centralAuth.addPublicKey(proc, m) // update the prometheus metrics - s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.KeyMap))) + + s.server.centralAuth.nodePublicKeys.mu.Lock() + mapLen := len(s.server.centralAuth.nodePublicKeys.KeyMap) + s.server.centralAuth.nodePublicKeys.mu.Unlock() + s.metrics.promHelloNodesTotal.Set(float64(mapLen)) s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() } diff --git a/requests.go b/requests.go index c533231..4b92e12 100644 --- a/requests.go +++ b/requests.go @@ -138,6 +138,8 @@ const ( REQPublicKeysGet Method = "REQPublicKeysGet" // REQPublicKeysToNode will put all the public received from central. REQPublicKeysToNode Method = "REQPublicKeysToNode" + // REQAuthPublicKeysAllow + REQPublicKeysAllow Method = "REQPublicKeysAllow" ) // The mapping of all the method constants specified, what type @@ -225,6 +227,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQPublicKeysToNode: methodREQPublicKeysToNode{ event: EventNACK, }, + REQPublicKeysAllow: methodREQPublicKeysAllow{ + event: EventACK, + }, }, } @@ -2035,6 +2040,9 @@ func (m methodREQPublicKeysGet) handler(proc process, message Message, node stri outCh := make(chan []byte) go func() { + // Normally we would do some logic here, where the result is passed to outCh when done. + // In this case this go func and the below select is not needed, but keeping it so the + // structure is the same as the other handlers. select { case <-ctx.Done(): // TODO: Should we receive a hash of he current keys from the node here @@ -2047,7 +2055,9 @@ func (m methodREQPublicKeysGet) handler(proc process, message Message, node stri case <-ctx.Done(): // case out := <-outCh: case <-outCh: + proc.centralAuth.nodePublicKeys.mu.Lock() b, err := json.Marshal(proc.centralAuth.nodePublicKeys.KeyMap) + proc.centralAuth.nodePublicKeys.mu.Unlock() if err != nil { er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err) proc.errorKernel.errSend(proc, message, er) @@ -2087,6 +2097,9 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s outCh := make(chan []byte) go func() { + // Normally we would do some logic here, where the result is passed to outCh when done. + // In this case this go func and the below select is not needed, but keeping it so the + // structure is the same as the other handlers. select { case <-ctx.Done(): // TODO: Should we receive a hash of he current keys from the node here ? @@ -2099,7 +2112,11 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s case <-ctx.Done(): case <-outCh: keys := make(map[Node]string) - json.Unmarshal(message.Data, &keys) + err := json.Unmarshal(message.Data, &keys) + if err != nil { + er := fmt.Errorf("error: REQPublicKeysToNode : json unmarshal failed: %v, message: %v", err, message) + proc.errorKernel.errSend(proc, message, er) + } fmt.Printf(" *** RECEIVED KEYS: %v\n", keys) @@ -2123,6 +2140,68 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s // ---- +type methodREQPublicKeysAllow struct { + event Event +} + +func (m methodREQPublicKeysAllow) getKind() Event { + return m.event +} + +// Handler to get all the public ed25519 keys from a central server. +func (m methodREQPublicKeysAllow) handler(proc process, message Message, node string) ([]byte, error) { + // Get a context with the timeout specified in message.MethodTimeout. + ctx, _ := getContextForMethodTimeout(proc.ctx, message) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + outCh := make(chan []byte) + + go func() { + // Normally we would do some logic here, where the result is passed to outCh when done. + // In this case this go func and the below select is not needed, but keeping it so the + // structure is the same as the other handlers. + select { + case <-ctx.Done(): + case outCh <- []byte{}: + } + }() + + select { + case <-ctx.Done(): + case <-outCh: + proc.centralAuth.nodeNotAckedPublicKeys.mu.Lock() + defer proc.centralAuth.nodeNotAckedPublicKeys.mu.Unlock() + + for _, n := range message.MethodArgs { + key, ok := proc.centralAuth.nodeNotAckedPublicKeys.KeyMap[Node(n)] + if ok { + // Store/update the node and public key on the allowed pubKey map. + proc.centralAuth.nodePublicKeys.mu.Lock() + proc.centralAuth.nodePublicKeys.KeyMap[Node(n)] = key + proc.centralAuth.nodePublicKeys.mu.Unlock() + + // Add key to persistent storage. + proc.centralAuth.dbUpdatePublicKey(string(n), key) + + // Delete the key from the NotAcked map + delete(proc.centralAuth.nodeNotAckedPublicKeys.KeyMap, Node(n)) + + er := fmt.Errorf("info: REQPublicKeysAllow : allowed new/updated public key for %v to allowed public key map", n) + proc.errorKernel.infoSend(proc, message, er) + } + } + + } + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +} + +// ---- + // ---- Template that can be used for creating request methods // func (m methodREQCopyFileTo) handler(proc process, message Message, node string) ([]byte, error) {