From c32e9a673a91d1ed31860c9556df2428c5bfbc2b Mon Sep 17 00:00:00 2001 From: postmannen Date: Wed, 1 Jun 2022 07:29:25 +0200 Subject: [PATCH] added REQKeysDelete --- central_auth_key_handling.go | 107 ++++++++++++++++++++--- processes.go | 8 ++ requests.go | 5 ++ requests_keys.go | 160 +++++++++++++++++++++-------------- 4 files changed, 205 insertions(+), 75 deletions(-) diff --git a/central_auth_key_handling.go b/central_auth_key_handling.go index bf91b76..49e14e8 100644 --- a/central_auth_key_handling.go +++ b/central_auth_key_handling.go @@ -2,12 +2,15 @@ package steward import ( "bytes" + "crypto/sha256" "fmt" "log" "os" "path/filepath" + "sort" "sync" + "github.com/fxamacker/cbor/v2" bolt "go.etcd.io/bbolt" ) @@ -110,16 +113,6 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { // addPublicKey to the db if the node do not exist, or if it is a new value. func (p *pki) addPublicKey(proc process, msg Message) { - // TODO: When receiviving a new or different keys for a node we should - // have a service with it's own storage for these keys, and an operator - // should have to acknowledge the new keys. - // For this we need: - // - A service that keeps the state of all the new keys detected in the - // bytes.equal check below. - // - A Log message should be thrown so we know that there is a new key. - // - A Request method that can be used by operator to acknowledge a new - // key for a host. - // Check if a key for the current node already exists in the map. p.nodesAcked.mu.Lock() existingKey, ok := p.nodesAcked.keysAndHash.Keys[msg.FromNode] @@ -149,6 +142,26 @@ func (p *pki) addPublicKey(proc process, msg Message) { p.errorKernel.infoSend(proc, msg, er) } +// deletePublicKeys to the db if the node do not exist, or if it is a new value. +func (p *pki) deletePublicKeys(proc process, msg Message, nodes []string) { + + // Check if a key for the current node already exists in the map. + func() { + p.nodesAcked.mu.Lock() + defer p.nodesAcked.mu.Unlock() + + for _, n := range nodes { + delete(p.nodesAcked.keysAndHash.Keys, Node(n)) + } + }() + + p.dbDeletePublicKeys(p.bucketNamePublicKeys, nodes) + + er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode) + fmt.Printf(" * %v\n", er) + p.errorKernel.infoSend(proc, msg, er) +} + // // dbGetPublicKey will look up and return a specific value if it exists for a key in a bucket in a DB. // func (c *centralAuth) dbGetPublicKey(node string) ([]byte, error) { // var value []byte @@ -196,6 +209,25 @@ func (p *pki) dbUpdatePublicKey(node string, value []byte) error { return err } +// dbDeletePublicKeys will delete the specified key from the specified +// bucket if it exists. +func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error { + err := p.db.Update(func(tx *bolt.Tx) error { + bu := tx.Bucket([]byte(bucket)) + + for _, n := range nodes { + err := bu.Delete([]byte(n)) + if err != nil { + log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err) + } + } + + return nil + }) + + return err +} + //dbUpdateHash will update the public key for a node in the db. func (p *pki) dbUpdateHash(hash []byte) error { err := p.db.Update(func(tx *bolt.Tx) error { @@ -217,6 +249,61 @@ func (p *pki) dbUpdateHash(hash []byte) error { return err } +func (p *pki) updateHash(proc process, message Message) { + p.nodesAcked.mu.Lock() + defer p.nodesAcked.mu.Unlock() + + type NodesAndKeys struct { + Node Node + Key []byte + } + + // Create a slice of all the map keys, and its value. + sortedNodesAndKeys := []NodesAndKeys{} + + // Range the map, and add each k/v to the sorted slice, to be sorted later. + for k, v := range p.nodesAcked.keysAndHash.Keys { + nk := NodesAndKeys{ + Node: k, + Key: v, + } + + sortedNodesAndKeys = append(sortedNodesAndKeys, nk) + } + + // sort the slice based on the node name. + // Sort all the commands. + sort.SliceStable(sortedNodesAndKeys, func(i, j int) bool { + return sortedNodesAndKeys[i].Node < sortedNodesAndKeys[j].Node + }) + + // Then create a hash based on the sorted slice. + + b, err := cbor.Marshal(sortedNodesAndKeys) + if err != nil { + er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err) + p.errorKernel.errSend(proc, message, er) + log.Printf(" * DEBUG: %v\n", er) + + return + } + + // Store the key in the key value map. + hash := sha256.Sum256(b) + p.nodesAcked.keysAndHash.Hash = hash + + // Store the key to the db for persistence. + p.dbUpdateHash(hash[:]) + if err != nil { + er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err) + p.errorKernel.errSend(proc, message, er) + log.Printf(" * DEBUG: %v\n", er) + + return + } + +} + // dbViewHash will look up and return a specific value if it exists for a key in a bucket in a DB. func (p *pki) dbViewHash() ([]byte, error) { var value []byte diff --git a/processes.go b/processes.go index a959ad8..3df50ee 100644 --- a/processes.go +++ b/processes.go @@ -186,6 +186,7 @@ func (p *processes) Start(proc process) { if proc.configuration.IsCentralAuth { proc.startup.subREQKeysRequestUpdate(proc) proc.startup.subREQKeysAllow(proc) + proc.startup.subREQKeysDelete(proc) proc.startup.subREQAclRequestUpdate(proc) @@ -458,6 +459,13 @@ func (s startup) subREQKeysAllow(p process) { go proc.spawnWorker() } +func (s startup) subREQKeysDelete(p process) { + log.Printf("Starting Public keys delete subscriber: %#v\n", p.node) + sub := newSubject(REQKeysDelete, string(p.node)) + proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil) + go proc.spawnWorker() +} + func (s startup) subREQAclRequestUpdate(p process) { log.Printf("Starting Acl Request update subscriber: %#v\n", p.node) sub := newSubject(REQAclRequestUpdate, string(p.node)) diff --git a/requests.go b/requests.go index fa9cf49..393376e 100644 --- a/requests.go +++ b/requests.go @@ -130,6 +130,8 @@ const ( REQKeysDeliverUpdate Method = "REQKeysDeliverUpdate" // REQKeysAllow REQKeysAllow Method = "REQKeysAllow" + // REQKeysDelete + REQKeysDelete Method = "REQKeysDelete" // REQAclRequestUpdate will get all node acl's from central if an update is available. REQAclRequestUpdate Method = "REQAclRequestUpdate" @@ -253,6 +255,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable { REQKeysAllow: methodREQKeysAllow{ event: EventACK, }, + REQKeysDelete: methodREQKeysDelete{ + event: EventACK, + }, REQAclRequestUpdate: methodREQAclRequestUpdate{ event: EventNACK, diff --git a/requests_keys.go b/requests_keys.go index a5b9531..f0e95f1 100644 --- a/requests_keys.go +++ b/requests_keys.go @@ -2,13 +2,9 @@ package steward import ( "bytes" - "crypto/sha256" "encoding/json" "fmt" "log" - "sort" - - "github.com/fxamacker/cbor/v2" ) // --- @@ -107,11 +103,6 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node func() { proc.centralAuth.pki.nodesAcked.mu.Lock() defer proc.centralAuth.pki.nodesAcked.mu.Unlock() - // TODO: We should probably create a hash of the current map content, - // store it alongside the KeyMap, and send both the KeyMap and hash - // back. We can then later send that hash when asking for keys, compare - // it with the current one for the KeyMap, and know if we need to send - // and update back to the node who published the request to here. fmt.Printf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data) @@ -185,8 +176,23 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node proc.nodeAuth.publicKeys.mu.Lock() - err := json.Unmarshal(message.Data, proc.nodeAuth.publicKeys.keysAndHash) - fmt.Printf("\n <---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", proc.nodeAuth.publicKeys.keysAndHash) + var keysAndHash keysAndHash + + err := json.Unmarshal(message.Data, &keysAndHash) + if err != nil { + er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message) + proc.errorKernel.errSend(proc, message, er) + } + + fmt.Printf("\n <---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", keysAndHash) + + // If the received map was empty we also want to delete all the locally stored keys, + // else we copy the marshaled keysAndHash we received from central into our map. + if len(keysAndHash.Keys) < 1 { + proc.nodeAuth.publicKeys.keysAndHash = newKeysAndHash() + } else { + proc.nodeAuth.publicKeys.keysAndHash = &keysAndHash + } proc.nodeAuth.publicKeys.mu.Unlock() @@ -285,60 +291,7 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string) // All new elements are now added, and we can create a new hash // representing the current keys in the allowed map. - func() { - proc.centralAuth.pki.nodesAcked.mu.Lock() - defer proc.centralAuth.pki.nodesAcked.mu.Unlock() - - type NodesAndKeys struct { - Node Node - Key []byte - } - - // Create a slice of all the map keys, and its value. - sortedNodesAndKeys := []NodesAndKeys{} - - // Range the map, and add each k/v to the sorted slice, to be sorted later. - for k, v := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys { - nk := NodesAndKeys{ - Node: k, - Key: v, - } - - sortedNodesAndKeys = append(sortedNodesAndKeys, nk) - } - - // sort the slice based on the node name. - // Sort all the commands. - sort.SliceStable(sortedNodesAndKeys, func(i, j int) bool { - return sortedNodesAndKeys[i].Node < sortedNodesAndKeys[j].Node - }) - - // Then create a hash based on the sorted slice. - - b, err := cbor.Marshal(sortedNodesAndKeys) - if err != nil { - er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err) - proc.errorKernel.errSend(proc, message, er) - log.Printf(" * DEBUG: %v\n", er) - - return - } - - // Store the key in the key value map. - hash := sha256.Sum256(b) - proc.centralAuth.pki.nodesAcked.keysAndHash.Hash = hash - - // Store the key to the db for persistence. - proc.centralAuth.pki.dbUpdateHash(hash[:]) - if err != nil { - er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err) - proc.errorKernel.errSend(proc, message, er) - log.Printf(" * DEBUG: %v\n", er) - - return - } - - }() + proc.centralAuth.pki.updateHash(proc, message) // TODO: FAILS: The push keys updates when change fails with that the // subscriber gets stuck. Need to look more into this later. @@ -440,3 +393,80 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string) ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) return ackMsg, nil } + +type methodREQKeysDelete struct { + event Event +} + +func (m methodREQKeysDelete) getKind() Event { + return m.event +} + +func (m methodREQKeysDelete) handler(proc process, message Message, node string) ([]byte, error) { + inf := fmt.Errorf("<--- methodREQKeysDelete received from: %v, containing: %v", message.FromNode, message.MethodArgs) + proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + // Get a context with the timeout specified in message.MethodTimeout. + ctx, cancel := getContextForMethodTimeout(proc.ctx, message) + outCh := make(chan []byte) + errCh := make(chan error) + + proc.processes.wg.Add(1) + go func() { + defer proc.processes.wg.Done() + + switch { + case len(message.MethodArgs) < 1: + errCh <- fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: got <1 number methodArgs, want >0") + return + } + + // HERE: + // We want to be able to define more nodes keys to delete. + // Loop over the methodArgs, and call the keyDelete function for each node, + // or rewrite the deleteKey to deleteKeys and it takes a []node as input + // so all keys can be deleted in 1 go, and we create 1 new hash, instead + // of doing it for each node delete. + + proc.centralAuth.pki.deletePublicKeys(proc, message, message.MethodArgs) + log.Printf(" * DEBUG Deleted public keys: %v\n", message.MethodArgs) + + // All new elements are now added, and we can create a new hash + // representing the current keys in the allowed map. + proc.centralAuth.pki.updateHash(proc, message) + log.Printf(" * DEBUG updated hash for public keys\n") + + outString := fmt.Sprintf("deleted public keys for the nodes=%v\n", message.MethodArgs) + out := []byte(outString) + + select { + case outCh <- out: + case <-ctx.Done(): + return + } + }() + + select { + case err := <-errCh: + proc.errorKernel.errSend(proc, message, err) + + case <-ctx.Done(): + cancel() + er := fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs) + proc.errorKernel.errSend(proc, message, er) + + case out := <-outCh: + // Prepare and queue for sending a new message with the output + // of the action executed. + newReplyMessage(proc, message, out) + } + + }() + + ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID)) + return ackMsg, nil +}