diff --git a/central_auth.go b/central_auth.go index 88e216c..1ed9b20 100644 --- a/central_auth.go +++ b/central_auth.go @@ -1,31 +1,193 @@ package steward -import "sync" +import ( + "fmt" + "log" + "os" + "path/filepath" + "sync" + + bolt "go.etcd.io/bbolt" +) type signatureBase32 string type argsString string type centralAuth struct { - schema map[Node]map[argsString]signatureBase32 - nodePublicKeys nodePublicKeys - configuration *Configuration + schema map[Node]map[argsString]signatureBase32 + nodePublicKeys *nodePublicKeys + configuration *Configuration + db *bolt.DB + bucketPublicKeys string + errorKernel *errorKernel } -func newCentralAuth(configuration *Configuration) *centralAuth { - a := centralAuth{ - schema: make(map[Node]map[argsString]signatureBase32), - nodePublicKeys: *newNodePublicKeys(), - configuration: configuration, +func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *centralAuth { + c := centralAuth{ + schema: make(map[Node]map[argsString]signatureBase32), + nodePublicKeys: newNodePublicKeys(configuration), + configuration: configuration, + bucketPublicKeys: "publicKeys", + errorKernel: errorKernel, } - return &a + databaseFilepath := filepath.Join(configuration.DatabaseFolder, "auth.db") + + // Open the database file for persistent storage of public keys. + + db, err := bolt.Open(databaseFilepath, 0600, nil) + if err != nil { + log.Printf("error: failed to open db: %v\n", err) + os.Exit(1) + } + + c.db = db + + keys, err := c.dbDumpPublicKey() + if err != nil { + fmt.Printf(" * DEBUG: dbPublicKeyDump failed: %v\n", err) + } + + c.nodePublicKeys.keyMap = keys + fmt.Printf(" * keyDump: %v\n ", keys) + + return &c } +// addPublicKey to the db if the node do not exist, or if it is a new value. +// We should return an error if the key have changed. +func (c *centralAuth) addPublicKey(proc process, msg Message) { + c.nodePublicKeys.mu.Lock() + + // Check if a key for the current node already exists in the map. + existingKey, ok := c.nodePublicKeys.keyMap[msg.FromNode] + + if ok && existingKey == string(msg.Data) { + fmt.Printf(" * key value for node %v is the same, doing nothing\n", msg.FromNode) + c.nodePublicKeys.mu.Unlock() + return + } + + // New key + c.nodePublicKeys.keyMap[msg.FromNode] = string(msg.Data) + c.nodePublicKeys.mu.Unlock() + + // Add key to persistent storage. + c.dbUpdatePublicKey(string(msg.FromNode), msg.Data) + + if ok { + er := fmt.Errorf("updated public key for node: %v", msg.FromNode) + fmt.Printf(" * %v\n", er) + c.errorKernel.infoSend(proc, msg, er) + } + if !ok { + er := fmt.Errorf("added new node with public key: %v", msg.FromNode) + fmt.Printf(" * %v\n", er) + c.errorKernel.infoSend(proc, msg, er) + } + + //c.dbDump(c.bucketPublicKeys) +} + +// dbView will look up and return a specific value if it exists for a key in a bucket in a DB. +func (c *centralAuth) dbGetPublicKey(node string) ([]byte, error) { + var value []byte + // View is a help function to get values out of the database. + err := c.db.View(func(tx *bolt.Tx) error { + //Open a bucket to get key's and values from. + bu := tx.Bucket([]byte(c.bucketPublicKeys)) + if bu == nil { + log.Printf("info: no db bucket exist: %v\n", c.bucketPublicKeys) + return nil + } + + v := bu.Get([]byte(node)) + if len(v) == 0 { + log.Printf("info: view: key not found\n") + return nil + } + + value = v + + return nil + }) + + return value, err +} + +//dbUpdatePublicKey will update the public key for a node in the db. +func (c *centralAuth) dbUpdatePublicKey(node string, value []byte) error { + err := c.db.Update(func(tx *bolt.Tx) error { + //Create a bucket + bu, err := tx.CreateBucketIfNotExists([]byte(c.bucketPublicKeys)) + if err != nil { + return fmt.Errorf("error: CreateBuckerIfNotExists failed: %v", err) + } + + //Put a value into the bucket. + if err := bu.Put([]byte(node), []byte(value)); err != nil { + return err + } + + //If all was ok, we should return a nil for a commit to happen. Any error + // returned will do a rollback. + return nil + }) + return err +} + +// deleteKeyFromBucket will delete the specified key from the specified +// bucket if it exists. +func (c *centralAuth) dbDeletePublicKey(key string) error { + err := c.db.Update(func(tx *bolt.Tx) error { + bu := tx.Bucket([]byte(c.bucketPublicKeys)) + + err := bu.Delete([]byte(key)) + if err != nil { + log.Printf("error: delete key in bucket %v failed: %v\n", c.bucketPublicKeys, err) + } + + return nil + }) + + return err +} + +// dumpBucket will dump out all they keys and values in the +// specified bucket, and return a sorted []samDBValue +func (c *centralAuth) dbDumpPublicKey() (map[Node]string, error) { + m := make(map[Node]string) + + err := c.db.View(func(tx *bolt.Tx) error { + bu := tx.Bucket([]byte(c.bucketPublicKeys)) + if bu == nil { + return fmt.Errorf("error: dumpBucket: tx.bucket returned nil") + } + + // For each element found in the DB, print it. + bu.ForEach(func(k, v []byte) error { + m[Node(k)] = string(v) + return nil + }) + + return nil + }) + + if err != nil { + return nil, err + } + + return m, nil +} + +// nodePublicKeys holds all the gathered public keys of nodes in the system. +// The keys will be written to a k/v store for persistence. type nodePublicKeys struct { mu sync.Mutex keyMap map[Node]string } -func newNodePublicKeys() *nodePublicKeys { +// newNodePublicKeys will return a prepared type of nodePublicKeys. +func newNodePublicKeys(configuration *Configuration) *nodePublicKeys { n := nodePublicKeys{ keyMap: make(map[Node]string), } diff --git a/processes.go b/processes.go index e09b848..3c57bb5 100644 --- a/processes.go +++ b/processes.go @@ -219,14 +219,16 @@ func (p *processes) Stop() { // Startup holds all the startup methods for subscribers. type startup struct { - server *server - metrics *metrics + server *server + centralAuth *centralAuth + metrics *metrics } func newStartup(server *server) *startup { s := startup{ - server: server, - metrics: server.metrics, + server: server, + centralAuth: server.centralAuth, + metrics: server.metrics, } return &s @@ -372,11 +374,7 @@ func (s startup) subREQHello(p process) { return nil } - // Add an entry for the node in the map - s.server.centralAuth.nodePublicKeys.mu.Lock() - s.server.centralAuth.nodePublicKeys.keyMap[m.FromNode] = string(m.Data) - fmt.Printf(" * MAP CONTENT:\n %v\n", s.server.centralAuth.nodePublicKeys.keyMap) - s.server.centralAuth.nodePublicKeys.mu.Unlock() + s.centralAuth.addPublicKey(proc, m) // update the prometheus metrics s.metrics.promHelloNodesTotal.Set(float64(len(s.server.centralAuth.nodePublicKeys.keyMap))) diff --git a/server.go b/server.go index c6628da..1e75c9a 100644 --- a/server.go +++ b/server.go @@ -162,7 +162,7 @@ func NewServer(configuration *Configuration, version string) (*server, error) { errorKernel: errorKernel, signatures: signatures, helloRegister: newHelloRegister(), - centralAuth: newCentralAuth(configuration), + centralAuth: newCentralAuth(configuration, errorKernel), } s.processes = newProcesses(ctx, &s)