mirror of
https://github.com/postmannen/ctrl.git
synced 2025-01-05 20:09:16 +00:00
added REQKeysDelete
This commit is contained in:
parent
7d133c35fb
commit
c32e9a673a
4 changed files with 205 additions and 75 deletions
|
@ -2,12 +2,15 @@ package steward
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
|
@ -110,16 +113,6 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
|
|||
// addPublicKey to the db if the node do not exist, or if it is a new value.
|
||||
func (p *pki) addPublicKey(proc process, msg Message) {
|
||||
|
||||
// TODO: When receiviving a new or different keys for a node we should
|
||||
// have a service with it's own storage for these keys, and an operator
|
||||
// should have to acknowledge the new keys.
|
||||
// For this we need:
|
||||
// - A service that keeps the state of all the new keys detected in the
|
||||
// bytes.equal check below.
|
||||
// - A Log message should be thrown so we know that there is a new key.
|
||||
// - A Request method that can be used by operator to acknowledge a new
|
||||
// key for a host.
|
||||
|
||||
// Check if a key for the current node already exists in the map.
|
||||
p.nodesAcked.mu.Lock()
|
||||
existingKey, ok := p.nodesAcked.keysAndHash.Keys[msg.FromNode]
|
||||
|
@ -149,6 +142,26 @@ func (p *pki) addPublicKey(proc process, msg Message) {
|
|||
p.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
|
||||
// deletePublicKeys to the db if the node do not exist, or if it is a new value.
|
||||
func (p *pki) deletePublicKeys(proc process, msg Message, nodes []string) {
|
||||
|
||||
// Check if a key for the current node already exists in the map.
|
||||
func() {
|
||||
p.nodesAcked.mu.Lock()
|
||||
defer p.nodesAcked.mu.Unlock()
|
||||
|
||||
for _, n := range nodes {
|
||||
delete(p.nodesAcked.keysAndHash.Keys, Node(n))
|
||||
}
|
||||
}()
|
||||
|
||||
p.dbDeletePublicKeys(p.bucketNamePublicKeys, nodes)
|
||||
|
||||
er := fmt.Errorf("info: detected new public key for node: %v. This key will need to be authorized by operator to be allowed into the system", msg.FromNode)
|
||||
fmt.Printf(" * %v\n", er)
|
||||
p.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
|
||||
// // dbGetPublicKey will look up and return a specific value if it exists for a key in a bucket in a DB.
|
||||
// func (c *centralAuth) dbGetPublicKey(node string) ([]byte, error) {
|
||||
// var value []byte
|
||||
|
@ -196,6 +209,25 @@ func (p *pki) dbUpdatePublicKey(node string, value []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// dbDeletePublicKeys will delete the specified key from the specified
|
||||
// bucket if it exists.
|
||||
func (p *pki) dbDeletePublicKeys(bucket string, nodes []string) error {
|
||||
err := p.db.Update(func(tx *bolt.Tx) error {
|
||||
bu := tx.Bucket([]byte(bucket))
|
||||
|
||||
for _, n := range nodes {
|
||||
err := bu.Delete([]byte(n))
|
||||
if err != nil {
|
||||
log.Printf("error: delete key in bucket %v failed: %v\n", bucket, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
//dbUpdateHash will update the public key for a node in the db.
|
||||
func (p *pki) dbUpdateHash(hash []byte) error {
|
||||
err := p.db.Update(func(tx *bolt.Tx) error {
|
||||
|
@ -217,6 +249,61 @@ func (p *pki) dbUpdateHash(hash []byte) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (p *pki) updateHash(proc process, message Message) {
|
||||
p.nodesAcked.mu.Lock()
|
||||
defer p.nodesAcked.mu.Unlock()
|
||||
|
||||
type NodesAndKeys struct {
|
||||
Node Node
|
||||
Key []byte
|
||||
}
|
||||
|
||||
// Create a slice of all the map keys, and its value.
|
||||
sortedNodesAndKeys := []NodesAndKeys{}
|
||||
|
||||
// Range the map, and add each k/v to the sorted slice, to be sorted later.
|
||||
for k, v := range p.nodesAcked.keysAndHash.Keys {
|
||||
nk := NodesAndKeys{
|
||||
Node: k,
|
||||
Key: v,
|
||||
}
|
||||
|
||||
sortedNodesAndKeys = append(sortedNodesAndKeys, nk)
|
||||
}
|
||||
|
||||
// sort the slice based on the node name.
|
||||
// Sort all the commands.
|
||||
sort.SliceStable(sortedNodesAndKeys, func(i, j int) bool {
|
||||
return sortedNodesAndKeys[i].Node < sortedNodesAndKeys[j].Node
|
||||
})
|
||||
|
||||
// Then create a hash based on the sorted slice.
|
||||
|
||||
b, err := cbor.Marshal(sortedNodesAndKeys)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err)
|
||||
p.errorKernel.errSend(proc, message, er)
|
||||
log.Printf(" * DEBUG: %v\n", er)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Store the key in the key value map.
|
||||
hash := sha256.Sum256(b)
|
||||
p.nodesAcked.keysAndHash.Hash = hash
|
||||
|
||||
// Store the key to the db for persistence.
|
||||
p.dbUpdateHash(hash[:])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err)
|
||||
p.errorKernel.errSend(proc, message, er)
|
||||
log.Printf(" * DEBUG: %v\n", er)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// dbViewHash will look up and return a specific value if it exists for a key in a bucket in a DB.
|
||||
func (p *pki) dbViewHash() ([]byte, error) {
|
||||
var value []byte
|
||||
|
|
|
@ -186,6 +186,7 @@ func (p *processes) Start(proc process) {
|
|||
if proc.configuration.IsCentralAuth {
|
||||
proc.startup.subREQKeysRequestUpdate(proc)
|
||||
proc.startup.subREQKeysAllow(proc)
|
||||
proc.startup.subREQKeysDelete(proc)
|
||||
|
||||
proc.startup.subREQAclRequestUpdate(proc)
|
||||
|
||||
|
@ -458,6 +459,13 @@ func (s startup) subREQKeysAllow(p process) {
|
|||
go proc.spawnWorker()
|
||||
}
|
||||
|
||||
func (s startup) subREQKeysDelete(p process) {
|
||||
log.Printf("Starting Public keys delete subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQKeysDelete, string(p.node))
|
||||
proc := newProcess(p.ctx, s.server, sub, processKindSubscriber, nil)
|
||||
go proc.spawnWorker()
|
||||
}
|
||||
|
||||
func (s startup) subREQAclRequestUpdate(p process) {
|
||||
log.Printf("Starting Acl Request update subscriber: %#v\n", p.node)
|
||||
sub := newSubject(REQAclRequestUpdate, string(p.node))
|
||||
|
|
|
@ -130,6 +130,8 @@ const (
|
|||
REQKeysDeliverUpdate Method = "REQKeysDeliverUpdate"
|
||||
// REQKeysAllow
|
||||
REQKeysAllow Method = "REQKeysAllow"
|
||||
// REQKeysDelete
|
||||
REQKeysDelete Method = "REQKeysDelete"
|
||||
|
||||
// REQAclRequestUpdate will get all node acl's from central if an update is available.
|
||||
REQAclRequestUpdate Method = "REQAclRequestUpdate"
|
||||
|
@ -253,6 +255,9 @@ func (m Method) GetMethodsAvailable() MethodsAvailable {
|
|||
REQKeysAllow: methodREQKeysAllow{
|
||||
event: EventACK,
|
||||
},
|
||||
REQKeysDelete: methodREQKeysDelete{
|
||||
event: EventACK,
|
||||
},
|
||||
|
||||
REQAclRequestUpdate: methodREQAclRequestUpdate{
|
||||
event: EventNACK,
|
||||
|
|
160
requests_keys.go
160
requests_keys.go
|
@ -2,13 +2,9 @@ package steward
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/sha256"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"sort"
|
||||
|
||||
"github.com/fxamacker/cbor/v2"
|
||||
)
|
||||
|
||||
// ---
|
||||
|
@ -107,11 +103,6 @@ func (m methodREQKeysRequestUpdate) handler(proc process, message Message, node
|
|||
func() {
|
||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||
// TODO: We should probably create a hash of the current map content,
|
||||
// store it alongside the KeyMap, and send both the KeyMap and hash
|
||||
// back. We can then later send that hash when asking for keys, compare
|
||||
// it with the current one for the KeyMap, and know if we need to send
|
||||
// and update back to the node who published the request to here.
|
||||
|
||||
fmt.Printf(" <---- methodREQKeysRequestUpdate: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data)
|
||||
|
||||
|
@ -185,8 +176,23 @@ func (m methodREQKeysDeliverUpdate) handler(proc process, message Message, node
|
|||
|
||||
proc.nodeAuth.publicKeys.mu.Lock()
|
||||
|
||||
err := json.Unmarshal(message.Data, proc.nodeAuth.publicKeys.keysAndHash)
|
||||
fmt.Printf("\n <---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", proc.nodeAuth.publicKeys.keysAndHash)
|
||||
var keysAndHash keysAndHash
|
||||
|
||||
err := json.Unmarshal(message.Data, &keysAndHash)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: REQKeysDeliverUpdate : json unmarshal failed: %v, message: %v", err, message)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
}
|
||||
|
||||
fmt.Printf("\n <---- REQKeysDeliverUpdate: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", keysAndHash)
|
||||
|
||||
// If the received map was empty we also want to delete all the locally stored keys,
|
||||
// else we copy the marshaled keysAndHash we received from central into our map.
|
||||
if len(keysAndHash.Keys) < 1 {
|
||||
proc.nodeAuth.publicKeys.keysAndHash = newKeysAndHash()
|
||||
} else {
|
||||
proc.nodeAuth.publicKeys.keysAndHash = &keysAndHash
|
||||
}
|
||||
|
||||
proc.nodeAuth.publicKeys.mu.Unlock()
|
||||
|
||||
|
@ -285,60 +291,7 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
|
|||
|
||||
// All new elements are now added, and we can create a new hash
|
||||
// representing the current keys in the allowed map.
|
||||
func() {
|
||||
proc.centralAuth.pki.nodesAcked.mu.Lock()
|
||||
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
|
||||
|
||||
type NodesAndKeys struct {
|
||||
Node Node
|
||||
Key []byte
|
||||
}
|
||||
|
||||
// Create a slice of all the map keys, and its value.
|
||||
sortedNodesAndKeys := []NodesAndKeys{}
|
||||
|
||||
// Range the map, and add each k/v to the sorted slice, to be sorted later.
|
||||
for k, v := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
|
||||
nk := NodesAndKeys{
|
||||
Node: k,
|
||||
Key: v,
|
||||
}
|
||||
|
||||
sortedNodesAndKeys = append(sortedNodesAndKeys, nk)
|
||||
}
|
||||
|
||||
// sort the slice based on the node name.
|
||||
// Sort all the commands.
|
||||
sort.SliceStable(sortedNodesAndKeys, func(i, j int) bool {
|
||||
return sortedNodesAndKeys[i].Node < sortedNodesAndKeys[j].Node
|
||||
})
|
||||
|
||||
// Then create a hash based on the sorted slice.
|
||||
|
||||
b, err := cbor.Marshal(sortedNodesAndKeys)
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
log.Printf(" * DEBUG: %v\n", er)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Store the key in the key value map.
|
||||
hash := sha256.Sum256(b)
|
||||
proc.centralAuth.pki.nodesAcked.keysAndHash.Hash = hash
|
||||
|
||||
// Store the key to the db for persistence.
|
||||
proc.centralAuth.pki.dbUpdateHash(hash[:])
|
||||
if err != nil {
|
||||
er := fmt.Errorf("error: methodREQKeysAllow, failed to store the hash into the db: %v", err)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
log.Printf(" * DEBUG: %v\n", er)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
}()
|
||||
proc.centralAuth.pki.updateHash(proc, message)
|
||||
|
||||
// TODO: FAILS: The push keys updates when change fails with that the
|
||||
// subscriber gets stuck. Need to look more into this later.
|
||||
|
@ -440,3 +393,80 @@ func (m methodREQKeysAllow) handler(proc process, message Message, node string)
|
|||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
||||
type methodREQKeysDelete struct {
|
||||
event Event
|
||||
}
|
||||
|
||||
func (m methodREQKeysDelete) getKind() Event {
|
||||
return m.event
|
||||
}
|
||||
|
||||
func (m methodREQKeysDelete) handler(proc process, message Message, node string) ([]byte, error) {
|
||||
inf := fmt.Errorf("<--- methodREQKeysDelete received from: %v, containing: %v", message.FromNode, message.MethodArgs)
|
||||
proc.errorKernel.logConsoleOnlyIfDebug(inf, proc.configuration)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
// Get a context with the timeout specified in message.MethodTimeout.
|
||||
ctx, cancel := getContextForMethodTimeout(proc.ctx, message)
|
||||
outCh := make(chan []byte)
|
||||
errCh := make(chan error)
|
||||
|
||||
proc.processes.wg.Add(1)
|
||||
go func() {
|
||||
defer proc.processes.wg.Done()
|
||||
|
||||
switch {
|
||||
case len(message.MethodArgs) < 1:
|
||||
errCh <- fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: got <1 number methodArgs, want >0")
|
||||
return
|
||||
}
|
||||
|
||||
// HERE:
|
||||
// We want to be able to define more nodes keys to delete.
|
||||
// Loop over the methodArgs, and call the keyDelete function for each node,
|
||||
// or rewrite the deleteKey to deleteKeys and it takes a []node as input
|
||||
// so all keys can be deleted in 1 go, and we create 1 new hash, instead
|
||||
// of doing it for each node delete.
|
||||
|
||||
proc.centralAuth.pki.deletePublicKeys(proc, message, message.MethodArgs)
|
||||
log.Printf(" * DEBUG Deleted public keys: %v\n", message.MethodArgs)
|
||||
|
||||
// All new elements are now added, and we can create a new hash
|
||||
// representing the current keys in the allowed map.
|
||||
proc.centralAuth.pki.updateHash(proc, message)
|
||||
log.Printf(" * DEBUG updated hash for public keys\n")
|
||||
|
||||
outString := fmt.Sprintf("deleted public keys for the nodes=%v\n", message.MethodArgs)
|
||||
out := []byte(outString)
|
||||
|
||||
select {
|
||||
case outCh <- out:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
proc.errorKernel.errSend(proc, message, err)
|
||||
|
||||
case <-ctx.Done():
|
||||
cancel()
|
||||
er := fmt.Errorf("error: methodREQAclGroupNodesDeleteNode: method timed out: %v", message.MethodArgs)
|
||||
proc.errorKernel.errSend(proc, message, er)
|
||||
|
||||
case out := <-outCh:
|
||||
// Prepare and queue for sending a new message with the output
|
||||
// of the action executed.
|
||||
newReplyMessage(proc, message, out)
|
||||
}
|
||||
|
||||
}()
|
||||
|
||||
ackMsg := []byte("confirmed from: " + node + ": " + fmt.Sprint(message.ID))
|
||||
return ackMsg, nil
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue