diff --git a/central_auth.go b/central_auth.go index e794d57..a8f15f1 100644 --- a/central_auth.go +++ b/central_auth.go @@ -13,9 +13,10 @@ import ( // centralAuth holds the logic related to handling public keys and auth maps. type centralAuth struct { - // schema map[Node]map[argsString]signatureBase32 + // acl and authorization level related data and methods. authorization *authorization - pki *pki + // public key distribution related data and methods. + pki *pki } // newCentralAuth will return a new and prepared *centralAuth @@ -28,8 +29,26 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen return &c } +// nodesAcked is the structure that holds all the keys that we have +// acknowledged, and that are allowed to be distributed within the +// system. It also contains a hash of all those keys. +type nodesAcked struct { + mu sync.Mutex + keysAndHash *keysAndHash +} + +// newNodesAcked will return a prepared *nodesAcked structure. +func newNodesAcked() *nodesAcked { + n := nodesAcked{ + keysAndHash: newKeysAndHash(), + } + + return &n +} + +// pki holds the data and method relevant to key handling and distribution. type pki struct { - nodeKeysAndHash *nodeKeysAndHash + nodesAcked *nodesAcked nodeNotAckedPublicKeys *nodeNotAckedPublicKeys configuration *Configuration db *bolt.DB @@ -41,7 +60,7 @@ type pki struct { func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { p := pki{ // schema: make(map[Node]map[argsString]signatureBase32), - nodeKeysAndHash: newNodeKeysAndHash(configuration), + nodesAcked: newNodesAcked(), nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration), configuration: configuration, bucketNamePublicKeys: "publicKeys", @@ -67,12 +86,24 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki { // Only assign from storage to in memory map if the storage contained any values. if keys != nil { - p.nodeKeysAndHash.KeyMap = keys + p.nodesAcked.keysAndHash.Keys = keys for k, v := range keys { log.Printf("info: public keys db contains: %v, %v\n", k, []byte(v)) } } + // Get the current hash from db if one exists. + hash, err := p.dbViewHash() + if err != nil { + log.Printf("debug: dbViewHash failed: %v\n", err) + } + + if hash != nil { + var h [32]byte + copy(h[:], hash) + p.nodesAcked.keysAndHash.Hash = h + } + return &p } @@ -90,12 +121,12 @@ func (p *pki) addPublicKey(proc process, msg Message) { // key for a host. // Check if a key for the current node already exists in the map. - p.nodeKeysAndHash.mu.Lock() - existingKey, ok := p.nodeKeysAndHash.KeyMap[msg.FromNode] - p.nodeKeysAndHash.mu.Unlock() + p.nodesAcked.mu.Lock() + existingKey, ok := p.nodesAcked.keysAndHash.Keys[msg.FromNode] + p.nodesAcked.mu.Unlock() if ok && bytes.Equal(existingKey, msg.Data) { - fmt.Printf(" * \nkey value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode) + fmt.Printf(" \n * public key value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode) return } @@ -165,6 +196,54 @@ func (p *pki) dbUpdatePublicKey(node string, value []byte) error { return err } +//dbUpdateHash will update the public key for a node in the db. +func (p *pki) dbUpdateHash(hash []byte) error { + err := p.db.Update(func(tx *bolt.Tx) error { + //Create a bucket + bu, err := tx.CreateBucketIfNotExists([]byte("hash")) + if err != nil { + return fmt.Errorf("error: CreateBuckerIfNotExists failed: %v", err) + } + + //Put a value into the bucket. + if err := bu.Put([]byte("hash"), []byte(hash)); err != nil { + return err + } + + //If all was ok, we should return a nil for a commit to happen. Any error + // returned will do a rollback. + return nil + }) + return err +} + +// dbViewHash will look up and return a specific value if it exists for a key in a bucket in a DB. +func (p *pki) dbViewHash() ([]byte, error) { + var value []byte + // View is a help function to get values out of the database. + err := p.db.View(func(tx *bolt.Tx) error { + //Open a bucket to get key's and values from. + bu := tx.Bucket([]byte("hash")) + if bu == nil { + log.Printf("info: no db hash bucket exist\n") + return nil + } + + v := bu.Get([]byte("hash")) + if len(v) == 0 { + log.Printf("info: view: hash key not found\n") + return nil + } + + value = v + + return nil + }) + + return value, err + +} + // // deleteKeyFromBucket will delete the specified key from the specified // // bucket if it exists. // func (c *centralAuth) dbDeletePublicKey(key string) error { @@ -209,27 +288,6 @@ func (p *pki) dbDumpPublicKey() (map[Node][]byte, error) { return m, nil } -// nodeKeysAndHash holds all the gathered public keys of nodes in the system. -// The keys will be written to a k/v store for persistence. -type nodeKeysAndHash struct { - mu sync.RWMutex - KeyMap map[Node][]byte - // TODO TOMOROW: implement sorting of KeyMap, - // Hash it and store the result into hash, - // marshal and send the whole nodePublicKeys to the end node. - // We should update the hash when a node is added with the allow key method. - Hash [32]byte -} - -// newNnodeKeysAndHash will return a prepared type of nodeKeysAndHash. -func newNodeKeysAndHash(configuration *Configuration) *nodeKeysAndHash { - n := nodeKeysAndHash{ - KeyMap: make(map[Node][]byte), - } - - return &n -} - // --- HERE // nodeNotAckedPublicKeys holds all the gathered but not acknowledged public diff --git a/go.mod b/go.mod index 3a07cd6..a9b5750 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/gdamore/tcell/v2 v2.4.1-0.20210905002822-f057f0a857a1 github.com/go-playground/validator/v10 v10.10.1 github.com/hpcloud/tail v1.0.0 + github.com/jinzhu/copier v0.3.5 github.com/klauspost/compress v1.14.2 github.com/nats-io/nats-server/v2 v2.6.2 github.com/nats-io/nats.go v1.14.0 diff --git a/go.sum b/go.sum index 646e8ac..dc4d63d 100644 --- a/go.sum +++ b/go.sum @@ -59,6 +59,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg= +github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= diff --git a/node_auth.go b/node_auth.go index 8f31fe6..1541953 100644 --- a/node_auth.go +++ b/node_auth.go @@ -79,18 +79,28 @@ func newAllowedSignatures() *allowedSignatures { return &a } +type keysAndHash struct { + Keys map[Node][]byte + Hash [32]byte +} + +func newKeysAndHash() *keysAndHash { + kh := keysAndHash{ + Keys: make(map[Node][]byte), + } + return &kh +} + type publicKeys struct { - // nodesKey is a map who holds all the public keys for nodes. - NodeKeys map[Node][]byte - Hash [32]byte - mu sync.Mutex - filePath string + keysAndHash *keysAndHash + mu sync.Mutex + filePath string } func newPublicKeys(c *Configuration) *publicKeys { p := publicKeys{ - NodeKeys: make(map[Node][]byte), - filePath: filepath.Join(c.DatabaseFolder, "publickeys.txt"), + keysAndHash: newKeysAndHash(), + filePath: filepath.Join(c.DatabaseFolder, "publickeys.txt"), } err := p.loadFromFile() @@ -126,12 +136,12 @@ func (p *publicKeys) loadFromFile() error { p.mu.Lock() defer p.mu.Unlock() - err = json.Unmarshal(b, &p.NodeKeys) + err = json.Unmarshal(b, &p.keysAndHash) if err != nil { return err } - fmt.Printf("\n ***** DEBUG: Loaded existing keys from file: %v\n\n", p.NodeKeys) + fmt.Printf("\n ***** DEBUG: Loaded existing keys from file: %v\n\n", p.keysAndHash.Hash) return nil } @@ -147,7 +157,7 @@ func (p *publicKeys) saveToFile() error { p.mu.Lock() defer p.mu.Unlock() - b, err := json.Marshal(p.NodeKeys) + b, err := json.Marshal(p.keysAndHash) if err != nil { return err } diff --git a/processes.go b/processes.go index d5b84c1..82c3acb 100644 --- a/processes.go +++ b/processes.go @@ -317,17 +317,21 @@ func (s startup) pubREQPublicKeysGet(p process) { // so we would know on the subscriber at central if it should send // and update with new keys back. + proc.nodeAuth.publicKeys.mu.Lock() + fmt.Printf("\n ----> REQPublicKeysGet: sending our current hash: %v\n\n", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:])) + m := Message{ - FileName: "publickeysget.log", - Directory: "publickeysget", - ToNode: Node(p.configuration.CentralNodeName), - FromNode: Node(p.node), - // Data: []byte(d), + FileName: "publickeysget.log", + Directory: "publickeysget", + ToNode: Node(p.configuration.CentralNodeName), + FromNode: Node(p.node), + Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]), Method: REQPublicKeysGet, ReplyMethod: REQPublicKeysToNode, ACKTimeout: proc.configuration.DefaultMessageTimeout, Retries: 1, } + proc.nodeAuth.publicKeys.mu.Unlock() sam, err := newSubjectAndMessage(m) if err != nil { @@ -449,9 +453,9 @@ func (s startup) subREQHello(p process) { // update the prometheus metrics - s.server.centralAuth.pki.nodeKeysAndHash.mu.Lock() - mapLen := len(s.server.centralAuth.pki.nodeKeysAndHash.KeyMap) - s.server.centralAuth.pki.nodeKeysAndHash.mu.Unlock() + s.server.centralAuth.pki.nodesAcked.mu.Lock() + mapLen := len(s.server.centralAuth.pki.nodesAcked.keysAndHash.Keys) + s.server.centralAuth.pki.nodesAcked.mu.Unlock() s.metrics.promHelloNodesTotal.Set(float64(mapLen)) s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime() diff --git a/requests.go b/requests.go index 51e9738..511426f 100644 --- a/requests.go +++ b/requests.go @@ -2064,28 +2064,26 @@ func (m methodREQPublicKeysGet) handler(proc process, message Message, node stri case <-ctx.Done(): // case out := <-outCh: case <-outCh: - proc.centralAuth.pki.nodeKeysAndHash.mu.Lock() + proc.centralAuth.pki.nodesAcked.mu.Lock() // TODO: We should probably create a hash of the current map content, // store it alongside the KeyMap, and send both the KeyMap and hash // back. We can then later send that hash when asking for keys, compare // it with the current one for the KeyMap, and know if we need to send // and update back to the node who published the request to here. - marsh := struct { - M map[Node][]byte - H [32]byte - }{ - M: proc.centralAuth.pki.nodeKeysAndHash.KeyMap, - H: proc.centralAuth.pki.nodeKeysAndHash.Hash, - } + fmt.Printf(" <---- methodREQPublicKeysGet: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data) + + fmt.Printf(" * methodREQPublicKeysGet: marshalling new keys and hash to send: map=%v, hash=%v\n\n", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash) + + b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash) + + proc.centralAuth.pki.nodesAcked.mu.Unlock() - b, err := json.Marshal(marsh) - proc.centralAuth.pki.nodeKeysAndHash.mu.Unlock() if err != nil { er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err) proc.errorKernel.errSend(proc, message, er) } - fmt.Printf("\n * SENDING KEYS TO NODE=%v\n", message.FromNode) + fmt.Printf("\n ----> methodREQPublicKeysGet: SENDING KEYS TO NODE=%v\n", message.FromNode) newReplyMessage(proc, message, b) } }() @@ -2135,24 +2133,23 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s // case proc.toRingbufferCh <- []subjectAndMessage{sam}: case <-ctx.Done(): case <-outCh: - // keys := make(map[Node]string) - marsh := struct { - M map[Node][]byte - H [32]byte - }{} proc.nodeAuth.publicKeys.mu.Lock() - err := json.Unmarshal(message.Data, &marsh) + + err := json.Unmarshal(message.Data, proc.nodeAuth.publicKeys.keysAndHash) + fmt.Printf("\n <---- REQPublicKeysToNode: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", proc.nodeAuth.publicKeys.keysAndHash) + + proc.nodeAuth.publicKeys.mu.Unlock() + if err != nil { er := fmt.Errorf("error: REQPublicKeysToNode : json unmarshal failed: %v, message: %v", err, message) proc.errorKernel.errSend(proc, message, er) } - proc.nodeAuth.publicKeys.NodeKeys = marsh.M - proc.nodeAuth.publicKeys.Hash = marsh.H - - fmt.Printf(" *** RECEIVED KEYS: %+v\n", marsh) - proc.nodeAuth.publicKeys.mu.Unlock() + // TODO TOMORROW: The hash is not sent with the requests to get public keys, and + // the reason is that the hash is not stored on the nodes ? + // Idea: We need to also persist the hash on the receiving nodes. We can then load + // that key upon startup, and send it along when we do a public keys get. err = proc.nodeAuth.publicKeys.saveToFile() if err != nil { @@ -2221,11 +2218,11 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st if ok { func() { - proc.centralAuth.pki.nodeKeysAndHash.mu.Lock() - defer proc.centralAuth.pki.nodeKeysAndHash.mu.Unlock() + proc.centralAuth.pki.nodesAcked.mu.Lock() + defer proc.centralAuth.pki.nodesAcked.mu.Unlock() // Store/update the node and public key on the allowed pubKey map. - proc.centralAuth.pki.nodeKeysAndHash.KeyMap[Node(n)] = key + proc.centralAuth.pki.nodesAcked.keysAndHash.Keys[Node(n)] = key }() // Add key to persistent storage. @@ -2242,8 +2239,8 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st // All new elements are now added, and we can create a new hash // representing the current keys in the allowed map. func() { - proc.centralAuth.pki.nodeKeysAndHash.mu.Lock() - defer proc.centralAuth.pki.nodeKeysAndHash.mu.Unlock() + proc.centralAuth.pki.nodesAcked.mu.Lock() + defer proc.centralAuth.pki.nodesAcked.mu.Unlock() type NodesAndKeys struct { Node Node @@ -2254,7 +2251,7 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st sortedNodesAndKeys := []NodesAndKeys{} // Range the map, and add each k/v to the sorted slice, to be sorted later. - for k, v := range proc.centralAuth.pki.nodeKeysAndHash.KeyMap { + for k, v := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys { nk := NodesAndKeys{ Node: k, Key: v, @@ -2275,12 +2272,24 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st if err != nil { er := fmt.Errorf("error: methodREQPublicKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err) proc.errorKernel.errSend(proc, message, er) - log.Printf(" * DEBUG: %v\n", err) + log.Printf(" * DEBUG: %v\n", er) return } - proc.centralAuth.pki.nodeKeysAndHash.Hash = sha256.Sum256(b) + // Store the key in the key value map. + hash := sha256.Sum256(b) + proc.centralAuth.pki.nodesAcked.keysAndHash.Hash = hash + + // Store the key to the db for persistence. + proc.centralAuth.pki.dbUpdateHash(hash[:]) + if err != nil { + er := fmt.Errorf("error: methodREQPublicKeysAllow, failed to store the hash into the db: %v", err) + proc.errorKernel.errSend(proc, message, er) + log.Printf(" * DEBUG: %v\n", er) + + return + } }() diff --git a/ringbuffer.go b/ringbuffer.go index 3bd2f8b..d6a3c4d 100644 --- a/ringbuffer.go +++ b/ringbuffer.go @@ -20,6 +20,7 @@ import ( "sync" "time" + copier "github.com/jinzhu/copier" bolt "go.etcd.io/bbolt" ) @@ -251,6 +252,19 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // messages to be processed while waiting for the done signal, or if an // error with an individual message occurs. go func(v samDBValue) { + // Create a copy of the message that we can use to write to the + // perm store without causing a race since the REQ handler for the + // message might not yet be done when message is written to the + // perm store. + // We also need a copy to be able to remove the data from the message + // when writing it to the store, so we don't mess up to actual data + // that might be in use in the handler. + + msgForPermStore := Message{} + copier.Copy(&msgForPermStore, v.Data.Message) + // Remove the content of the data field. + msgForPermStore.Data = nil + v.Data.Message.done = make(chan struct{}) delivredCh := make(chan struct{}) @@ -290,7 +304,84 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB // it out of the K/V Store. r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID)) - r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v) + //m := v.Data.Message + //t := time.Now().Format("Mon Jan _2 15:04:05 2006") + + //tmpout := os.Stdout + + //_ = fmt.Sprintf("%v\n", t) + //_ = fmt.Sprintf("%v\n", m.ID) + //_ = fmt.Sprintf("%v\n", m.ToNode) + //_ = fmt.Sprintf("%v\n", m.ToNodes) + //_ = fmt.Sprintf("%v\n", m.Data) + //_ = fmt.Sprintf("%v\n", m.Method) + //_ = fmt.Sprintf("%v\n", m.MethodArgs) + //_ = fmt.Sprintf("%v\n", m.ArgSignature) + //_ = fmt.Sprintf("%v\n", m.ReplyMethod) + //_ = fmt.Sprintf("%v\n", m.ReplyMethodArgs) + //_ = fmt.Sprintf("%v\n", m.IsReply) + //_ = fmt.Sprintf("%v\n", m.FromNode) + //_ = fmt.Sprintf("%v\n", m.ACKTimeout) + //_ = fmt.Sprintf("%v\n", m.Retries) + //_ = fmt.Sprintf("%v\n", m.ReplyACKTimeout) + //_ = fmt.Sprintf("%v\n", m.ReplyRetries) + //_ = fmt.Sprintf("%v\n", m.MethodTimeout) + //_ = fmt.Sprintf("%v\n", m.ReplyMethodTimeout) + //_ = fmt.Sprintf("%v\n", m.Directory) + //_ = fmt.Sprintf("%v\n", m.FileName) + //_ = fmt.Sprintf("%v\n", m.PreviousMessage) + //_ = fmt.Sprintf("%v\n", m.RelayViaNode) + //_ = fmt.Sprintf("%v\n", m.RelayOriginalViaNode) + //_ = fmt.Sprintf("%v\n", m.RelayFromNode) + //_ = fmt.Sprintf("%v\n", m.RelayToNode) + //_ = fmt.Sprintf("%v\n", m.RelayOriginalMethod) + //_ = fmt.Sprintf("%v\n", m.RelayReplyMethod) + //_ = fmt.Sprintf("%v\n", m.done) + + //str := fmt.Sprintln( + // t, + // m.ID, + // m.ToNode, + // m.ToNodes, + // m.Data, + // m.Method, + // m.MethodArgs, + // m.ArgSignature, + // m.ReplyMethod, + // m.ReplyMethodArgs, + // m.IsReply, + // m.FromNode, + // m.ACKTimeout, + // m.Retries, + // m.ReplyACKTimeout, + // m.ReplyRetries, + // m.MethodTimeout, + // m.ReplyMethodTimeout, + // m.Directory, + // m.FileName, + // m.PreviousMessage, + // m.RelayViaNode, + // m.RelayOriginalViaNode, + // m.RelayFromNode, + // m.RelayToNode, + // m.RelayOriginalMethod, + // m.RelayReplyMethod, + // m.done, + //) + + //r.permStore <- fmt.Sprintf("%v\n", str) + + // NB: Removed this one since it creates a data race with the storing of the hash value in + // the methodREQPublicKeysToNode. Sorted by splitting up the sprint below with the sprint + // above for now, but should investigate further what might be the case here, since the + // message have no reference to the proc and should in theory not create a race. + // + js, err := json.Marshal(msgForPermStore) + if err != nil { + er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err) + r.errorKernel.errSend(r.processInitial, Message{}, er) + } + r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n" }(v) case <-ctx.Done():