mirror of
https://github.com/postmannen/ctrl.git
synced 2024-12-14 12:37:31 +00:00
added persistent storage for received public keys
This commit is contained in:
parent
594a9f495c
commit
e0f0c4db18
3 changed files with 181 additions and 21 deletions
184
central_auth.go
184
central_auth.go
|
@ -1,31 +1,193 @@
|
|||
package steward
|
||||
|
||||
import "sync"
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
bolt "go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
type signatureBase32 string
|
||||
type argsString string
|
||||
type centralAuth struct {
|
||||
schema map[Node]map[argsString]signatureBase32
|
||||
nodePublicKeys nodePublicKeys
|
||||
configuration *Configuration
|
||||
schema map[Node]map[argsString]signatureBase32
|
||||
nodePublicKeys *nodePublicKeys
|
||||
configuration *Configuration
|
||||
db *bolt.DB
|
||||
bucketPublicKeys string
|
||||
errorKernel *errorKernel
|
||||
}
|
||||
|
||||
func newCentralAuth(configuration *Configuration) *centralAuth {
|
||||
a := centralAuth{
|
||||
schema: make(map[Node]map[argsString]signatureBase32),
|
||||
nodePublicKeys: *newNodePublicKeys(),
|
||||
configuration: configuration,
|
||||
func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *centralAuth {
|
||||
c := centralAuth{
|
||||
schema: make(map[Node]map[argsString]signatureBase32),
|
||||
nodePublicKeys: newNodePublicKeys(configuration),
|
||||
configuration: configuration,
|
||||
bucketPublicKeys: "publicKeys",
|
||||
errorKernel: errorKernel,
|
||||
}
|
||||
|
||||
return &a
|
||||
databaseFilepath := filepath.Join(configuration.DatabaseFolder, "auth.db")
|
||||
|
||||
// Open the database file for persistent storage of public keys.
|
||||
|
||||
db, err := bolt.Open(databaseFilepath, 0600, nil)
|
||||
if err != nil {
|
||||
log.Printf("error: failed to open db: %v\n", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
c.db = db
|
||||
|
||||
keys, err := c.dbDumpPublicKey()
|
||||
if err != nil {
|
||||
fmt.Printf(" * DEBUG: dbPublicKeyDump failed: %v\n", err)
|
||||
}
|
||||
|
||||
c.nodePublicKeys.keyMap = keys
|
||||
fmt.Printf(" * keyDump: %v\n ", keys)
|
||||
|
||||
return &c
|
||||
}
|
||||
|
||||
// addPublicKey to the db if the node do not exist, or if it is a new value.
|
||||
// We should return an error if the key have changed.
|
||||
func (c *centralAuth) addPublicKey(proc process, msg Message) {
|
||||
c.nodePublicKeys.mu.Lock()
|
||||
|
||||
// Check if a key for the current node already exists in the map.
|
||||
existingKey, ok := c.nodePublicKeys.keyMap[msg.FromNode]
|
||||
|
||||
if ok && existingKey == string(msg.Data) {
|
||||
fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode)
|
||||
c.nodePublicKeys.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
// New key
|
||||
c.nodePublicKeys.keyMap[msg.FromNode] = string(msg.Data)
|
||||
c.nodePublicKeys.mu.Unlock()
|
||||
|
||||
// Add key to persistent storage.
|
||||
c.dbUpdatePublicKey(string(msg.FromNode), msg.Data)
|
||||
|
||||
if ok {
|
||||
er := fmt.Errorf("updated public key for node: %v", msg.FromNode)
|
||||
fmt.Printf(" * %v\n", er)
|
||||
c.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
if !ok {
|
||||
er := fmt.Errorf("added new node with public key: %v", msg.FromNode)
|
||||
fmt.Printf(" * %v\n", er)
|
||||
c.errorKernel.infoSend(proc, msg, er)
|
||||
}
|
||||
|
||||
//c.dbDump(c.bucketPublicKeys)
|
||||
}
|
||||
|
||||
// dbView will look up and return a specific value if it exists for a key in a bucket in a DB.
|
||||
func (c *centralAuth) dbGetPublicKey(node string) ([]byte, error) {
|
||||
var value []byte
|
||||
// View is a help function to get values out of the database.
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
//Open a bucket to get key's and values from.
|
||||
bu := tx.Bucket([]byte(c.bucketPublicKeys))
|
||||
if bu == nil {
|
||||
log.Printf("info: no db bucket exist: %v\n", c.bucketPublicKeys)
|
||||
return nil
|
||||
}
|
||||
|
||||
v := bu.Get([]byte(node))
|
||||
if len(v) == 0 {
|
||||
log.Printf("info: view: key not found\n")
|
||||
return nil
|
||||
}
|
||||
|
||||
value = v
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return value, err
|
||||
}
|
||||
|
||||
//dbUpdatePublicKey will update the public key for a node in the db.
|
||||
func (c *centralAuth) dbUpdatePublicKey(node string, value []byte) error {
|
||||
err := c.db.Update(func(tx *bolt.Tx) error {
|
||||
//Create a bucket
|
||||
bu, err := tx.CreateBucketIfNotExists([]byte(c.bucketPublicKeys))
|
||||
if err != nil {
|
||||
return fmt.Errorf("error: CreateBuckerIfNotExists failed: %v", err)
|
||||
}
|
||||
|
||||
//Put a value into the bucket.
|
||||
if err := bu.Put([]byte(node), []byte(value)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//If all was ok, we should return a nil for a commit to happen. Any error
|
||||
// returned will do a rollback.
|
||||
return nil
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
// deleteKeyFromBucket will delete the specified key from the specified
|
||||
// bucket if it exists.
|
||||
func (c *centralAuth) dbDeletePublicKey(key string) error {
|
||||
err := c.db.Update(func(tx *bolt.Tx) error {
|
||||
bu := tx.Bucket([]byte(c.bucketPublicKeys))
|
||||
|
||||
err := bu.Delete([]byte(key))
|
||||
if err != nil {
|
||||
log.Printf("error: delete key in bucket %v failed: %v\n", c.bucketPublicKeys, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// dumpBucket will dump out all they keys and values in the
|
||||
// specified bucket, and return a sorted []samDBValue
|
||||
func (c *centralAuth) dbDumpPublicKey() (map[Node]string, error) {
|
||||
m := make(map[Node]string)
|
||||
|
||||
err := c.db.View(func(tx *bolt.Tx) error {
|
||||
bu := tx.Bucket([]byte(c.bucketPublicKeys))
|
||||
if bu == nil {
|
||||
return fmt.Errorf("error: dumpBucket: tx.bucket returned nil")
|
||||
}
|
||||
|
||||
// For each element found in the DB, print it.
|
||||
bu.ForEach(func(k, v []byte) error {
|
||||
m[Node(k)] = string(v)
|
||||
return nil
|
||||
})
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// nodePublicKeys holds all the gathered public keys of nodes in the system.
|
||||
// The keys will be written to a k/v store for persistence.
|
||||
type nodePublicKeys struct {
|
||||
mu sync.Mutex
|
||||
keyMap map[Node]string
|
||||
}
|
||||
|
||||
func newNodePublicKeys() *nodePublicKeys {
|
||||
// newNodePublicKeys will return a prepared type of nodePublicKeys.
|
||||
func newNodePublicKeys(configuration *Configuration) *nodePublicKeys {
|
||||
n := nodePublicKeys{
|
||||
keyMap: make(map[Node]string),
|
||||
}
|
||||
|
|
16
processes.go
16
processes.go
|
@ -219,14 +219,16 @@ func (p *processes) Stop() {
|
|||
|
||||
// Startup holds all the startup methods for subscribers.
|
||||
type startup struct {
|
||||
server *server
|
||||
metrics *metrics
|
||||
server *server
|
||||
centralAuth *centralAuth
|
||||
metrics *metrics
|
||||
}
|
||||
|
||||
func newStartup(server *server) *startup {
|
||||
s := startup{
|
||||
server: server,
|
||||
metrics: server.metrics,
|
||||
server: server,
|
||||
centralAuth: server.centralAuth,
|
||||
metrics: server.metrics,
|
||||
}
|
||||
|
||||
return &s
|
||||
|
@ -372,11 +374,7 @@ func (s startup) subREQHello(p process) {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Add an entry for the node in the map
|
||||
s.server.centralAuth.nodePublicKeys.mu.Lock()
|
||||
s.server.centralAuth.nodePublicKeys.keyMap[m.FromNode] = string(m.Data)
|
||||
fmt.Printf(" * MAP CONTENT:\n %v\n", s.server.centralAuth.nodePublicKeys.keyMap)
|
||||
s.server.centralAuth.nodePublicKeys.mu.Unlock()
|
||||
s.centralAuth.addPublicKey(proc, m)
|
||||
|
||||
// update the prometheus metrics
|
||||
s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.keyMap)))
|
||||
|
|
|
@ -162,7 +162,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) {
|
|||
errorKernel: errorKernel,
|
||||
signatures: signatures,
|
||||
helloRegister: newHelloRegister(),
|
||||
centralAuth: newCentralAuth(configuration),
|
||||
centralAuth: newCentralAuth(configuration, errorKernel),
|
||||
}
|
||||
|
||||
s.processes = newProcesses(ctx, &s)
|
||||
|
|
Loading…
Reference in a new issue