1
0
Fork 0
mirror of https://github.com/postmannen/ctrl.git synced 2025-03-31 01:24:31 +00:00

fixing race condition

This commit is contained in:
postmannen 2022-05-16 07:15:38 +02:00
parent e720c48c44
commit 3bd54d9cfc
7 changed files with 254 additions and 79 deletions

View file

@ -13,9 +13,10 @@ import (
// centralAuth holds the logic related to handling public keys and auth maps.
type centralAuth struct {
// schema map[Node]map[argsString]signatureBase32
// acl and authorization level related data and methods.
authorization *authorization
pki *pki
// public key distribution related data and methods.
pki *pki
}
// newCentralAuth will return a new and prepared *centralAuth
@ -28,8 +29,26 @@ func newCentralAuth(configuration *Configuration, errorKernel *errorKernel) *cen
return &c
}
// nodesAcked is the structure that holds all the keys that we have
// acknowledged, and that are allowed to be distributed within the
// system. It also contains a hash of all those keys.
type nodesAcked struct {
mu sync.Mutex
keysAndHash *keysAndHash
}
// newNodesAcked will return a prepared *nodesAcked structure.
func newNodesAcked() *nodesAcked {
n := nodesAcked{
keysAndHash: newKeysAndHash(),
}
return &n
}
// pki holds the data and method relevant to key handling and distribution.
type pki struct {
nodeKeysAndHash *nodeKeysAndHash
nodesAcked *nodesAcked
nodeNotAckedPublicKeys *nodeNotAckedPublicKeys
configuration *Configuration
db *bolt.DB
@ -41,7 +60,7 @@ type pki struct {
func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
p := pki{
// schema: make(map[Node]map[argsString]signatureBase32),
nodeKeysAndHash: newNodeKeysAndHash(configuration),
nodesAcked: newNodesAcked(),
nodeNotAckedPublicKeys: newNodeNotAckedPublicKeys(configuration),
configuration: configuration,
bucketNamePublicKeys: "publicKeys",
@ -67,12 +86,24 @@ func newPKI(configuration *Configuration, errorKernel *errorKernel) *pki {
// Only assign from storage to in memory map if the storage contained any values.
if keys != nil {
p.nodeKeysAndHash.KeyMap = keys
p.nodesAcked.keysAndHash.Keys = keys
for k, v := range keys {
log.Printf("info: public keys db contains: %v, %v\n", k, []byte(v))
}
}
// Get the current hash from db if one exists.
hash, err := p.dbViewHash()
if err != nil {
log.Printf("debug: dbViewHash failed: %v\n", err)
}
if hash != nil {
var h [32]byte
copy(h[:], hash)
p.nodesAcked.keysAndHash.Hash = h
}
return &p
}
@ -90,12 +121,12 @@ func (p *pki) addPublicKey(proc process, msg Message) {
// key for a host.
// Check if a key for the current node already exists in the map.
p.nodeKeysAndHash.mu.Lock()
existingKey, ok := p.nodeKeysAndHash.KeyMap[msg.FromNode]
p.nodeKeysAndHash.mu.Unlock()
p.nodesAcked.mu.Lock()
existingKey, ok := p.nodesAcked.keysAndHash.Keys[msg.FromNode]
p.nodesAcked.mu.Unlock()
if ok && bytes.Equal(existingKey, msg.Data) {
fmt.Printf(" * \nkey value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode)
fmt.Printf(" \n * public key value for REGISTERED node %v is the same, doing nothing\n\n", msg.FromNode)
return
}
@ -165,6 +196,54 @@ func (p *pki) dbUpdatePublicKey(node string, value []byte) error {
return err
}
//dbUpdateHash will update the public key for a node in the db.
func (p *pki) dbUpdateHash(hash []byte) error {
err := p.db.Update(func(tx *bolt.Tx) error {
//Create a bucket
bu, err := tx.CreateBucketIfNotExists([]byte("hash"))
if err != nil {
return fmt.Errorf("error: CreateBuckerIfNotExists failed: %v", err)
}
//Put a value into the bucket.
if err := bu.Put([]byte("hash"), []byte(hash)); err != nil {
return err
}
//If all was ok, we should return a nil for a commit to happen. Any error
// returned will do a rollback.
return nil
})
return err
}
// dbViewHash will look up and return a specific value if it exists for a key in a bucket in a DB.
func (p *pki) dbViewHash() ([]byte, error) {
var value []byte
// View is a help function to get values out of the database.
err := p.db.View(func(tx *bolt.Tx) error {
//Open a bucket to get key's and values from.
bu := tx.Bucket([]byte("hash"))
if bu == nil {
log.Printf("info: no db hash bucket exist\n")
return nil
}
v := bu.Get([]byte("hash"))
if len(v) == 0 {
log.Printf("info: view: hash key not found\n")
return nil
}
value = v
return nil
})
return value, err
}
// // deleteKeyFromBucket will delete the specified key from the specified
// // bucket if it exists.
// func (c *centralAuth) dbDeletePublicKey(key string) error {
@ -209,27 +288,6 @@ func (p *pki) dbDumpPublicKey() (map[Node][]byte, error) {
return m, nil
}
// nodeKeysAndHash holds all the gathered public keys of nodes in the system.
// The keys will be written to a k/v store for persistence.
type nodeKeysAndHash struct {
mu sync.RWMutex
KeyMap map[Node][]byte
// TODO TOMOROW: implement sorting of KeyMap,
// Hash it and store the result into hash,
// marshal and send the whole nodePublicKeys to the end node.
// We should update the hash when a node is added with the allow key method.
Hash [32]byte
}
// newNnodeKeysAndHash will return a prepared type of nodeKeysAndHash.
func newNodeKeysAndHash(configuration *Configuration) *nodeKeysAndHash {
n := nodeKeysAndHash{
KeyMap: make(map[Node][]byte),
}
return &n
}
// --- HERE
// nodeNotAckedPublicKeys holds all the gathered but not acknowledged public

1
go.mod
View file

@ -8,6 +8,7 @@ require (
github.com/gdamore/tcell/v2 v2.4.1-0.20210905002822-f057f0a857a1
github.com/go-playground/validator/v10 v10.10.1
github.com/hpcloud/tail v1.0.0
github.com/jinzhu/copier v0.3.5
github.com/klauspost/compress v1.14.2
github.com/nats-io/nats-server/v2 v2.6.2
github.com/nats-io/nats.go v1.14.0

2
go.sum
View file

@ -59,6 +59,8 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jinzhu/copier v0.3.5 h1:GlvfUwHk62RokgqVNvYsku0TATCF7bAHVwEXoBh3iJg=
github.com/jinzhu/copier v0.3.5/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=

View file

@ -79,18 +79,28 @@ func newAllowedSignatures() *allowedSignatures {
return &a
}
type keysAndHash struct {
Keys map[Node][]byte
Hash [32]byte
}
func newKeysAndHash() *keysAndHash {
kh := keysAndHash{
Keys: make(map[Node][]byte),
}
return &kh
}
type publicKeys struct {
// nodesKey is a map who holds all the public keys for nodes.
NodeKeys map[Node][]byte
Hash [32]byte
mu sync.Mutex
filePath string
keysAndHash *keysAndHash
mu sync.Mutex
filePath string
}
func newPublicKeys(c *Configuration) *publicKeys {
p := publicKeys{
NodeKeys: make(map[Node][]byte),
filePath: filepath.Join(c.DatabaseFolder, "publickeys.txt"),
keysAndHash: newKeysAndHash(),
filePath: filepath.Join(c.DatabaseFolder, "publickeys.txt"),
}
err := p.loadFromFile()
@ -126,12 +136,12 @@ func (p *publicKeys) loadFromFile() error {
p.mu.Lock()
defer p.mu.Unlock()
err = json.Unmarshal(b, &p.NodeKeys)
err = json.Unmarshal(b, &p.keysAndHash)
if err != nil {
return err
}
fmt.Printf("\n ***** DEBUG: Loaded existing keys from file: %v\n\n", p.NodeKeys)
fmt.Printf("\n ***** DEBUG: Loaded existing keys from file: %v\n\n", p.keysAndHash.Hash)
return nil
}
@ -147,7 +157,7 @@ func (p *publicKeys) saveToFile() error {
p.mu.Lock()
defer p.mu.Unlock()
b, err := json.Marshal(p.NodeKeys)
b, err := json.Marshal(p.keysAndHash)
if err != nil {
return err
}

View file

@ -317,17 +317,21 @@ func (s startup) pubREQPublicKeysGet(p process) {
// so we would know on the subscriber at central if it should send
// and update with new keys back.
proc.nodeAuth.publicKeys.mu.Lock()
fmt.Printf("\n ----> REQPublicKeysGet: sending our current hash: %v\n\n", []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]))
m := Message{
FileName: "publickeysget.log",
Directory: "publickeysget",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(p.node),
// Data: []byte(d),
FileName: "publickeysget.log",
Directory: "publickeysget",
ToNode: Node(p.configuration.CentralNodeName),
FromNode: Node(p.node),
Data: []byte(proc.nodeAuth.publicKeys.keysAndHash.Hash[:]),
Method: REQPublicKeysGet,
ReplyMethod: REQPublicKeysToNode,
ACKTimeout: proc.configuration.DefaultMessageTimeout,
Retries: 1,
}
proc.nodeAuth.publicKeys.mu.Unlock()
sam, err := newSubjectAndMessage(m)
if err != nil {
@ -449,9 +453,9 @@ func (s startup) subREQHello(p process) {
// update the prometheus metrics
s.server.centralAuth.pki.nodeKeysAndHash.mu.Lock()
mapLen := len(s.server.centralAuth.pki.nodeKeysAndHash.KeyMap)
s.server.centralAuth.pki.nodeKeysAndHash.mu.Unlock()
s.server.centralAuth.pki.nodesAcked.mu.Lock()
mapLen := len(s.server.centralAuth.pki.nodesAcked.keysAndHash.Keys)
s.server.centralAuth.pki.nodesAcked.mu.Unlock()
s.metrics.promHelloNodesTotal.Set(float64(mapLen))
s.metrics.promHelloNodesContactLast.With(prometheus.Labels{"nodeName": string(m.FromNode)}).SetToCurrentTime()

View file

@ -2064,28 +2064,26 @@ func (m methodREQPublicKeysGet) handler(proc process, message Message, node stri
case <-ctx.Done():
// case out := <-outCh:
case <-outCh:
proc.centralAuth.pki.nodeKeysAndHash.mu.Lock()
proc.centralAuth.pki.nodesAcked.mu.Lock()
// TODO: We should probably create a hash of the current map content,
// store it alongside the KeyMap, and send both the KeyMap and hash
// back. We can then later send that hash when asking for keys, compare
// it with the current one for the KeyMap, and know if we need to send
// and update back to the node who published the request to here.
marsh := struct {
M map[Node][]byte
H [32]byte
}{
M: proc.centralAuth.pki.nodeKeysAndHash.KeyMap,
H: proc.centralAuth.pki.nodeKeysAndHash.Hash,
}
fmt.Printf(" <---- methodREQPublicKeysGet: received hash from NODE=%v, HASH=%v\n", message.FromNode, message.Data)
fmt.Printf(" * methodREQPublicKeysGet: marshalling new keys and hash to send: map=%v, hash=%v\n\n", proc.centralAuth.pki.nodesAcked.keysAndHash.Keys, proc.centralAuth.pki.nodesAcked.keysAndHash.Hash)
b, err := json.Marshal(proc.centralAuth.pki.nodesAcked.keysAndHash)
proc.centralAuth.pki.nodesAcked.mu.Unlock()
b, err := json.Marshal(marsh)
proc.centralAuth.pki.nodeKeysAndHash.mu.Unlock()
if err != nil {
er := fmt.Errorf("error: REQPublicKeysGet, failed to marshal keys map: %v", err)
proc.errorKernel.errSend(proc, message, er)
}
fmt.Printf("\n * SENDING KEYS TO NODE=%v\n", message.FromNode)
fmt.Printf("\n ----> methodREQPublicKeysGet: SENDING KEYS TO NODE=%v\n", message.FromNode)
newReplyMessage(proc, message, b)
}
}()
@ -2135,24 +2133,23 @@ func (m methodREQPublicKeysToNode) handler(proc process, message Message, node s
// case proc.toRingbufferCh <- []subjectAndMessage{sam}:
case <-ctx.Done():
case <-outCh:
// keys := make(map[Node]string)
marsh := struct {
M map[Node][]byte
H [32]byte
}{}
proc.nodeAuth.publicKeys.mu.Lock()
err := json.Unmarshal(message.Data, &marsh)
err := json.Unmarshal(message.Data, proc.nodeAuth.publicKeys.keysAndHash)
fmt.Printf("\n <---- REQPublicKeysToNode: after unmarshal, nodeAuth keysAndhash contains: %+v\n\n", proc.nodeAuth.publicKeys.keysAndHash)
proc.nodeAuth.publicKeys.mu.Unlock()
if err != nil {
er := fmt.Errorf("error: REQPublicKeysToNode : json unmarshal failed: %v, message: %v", err, message)
proc.errorKernel.errSend(proc, message, er)
}
proc.nodeAuth.publicKeys.NodeKeys = marsh.M
proc.nodeAuth.publicKeys.Hash = marsh.H
fmt.Printf(" *** RECEIVED KEYS: %+v\n", marsh)
proc.nodeAuth.publicKeys.mu.Unlock()
// TODO TOMORROW: The hash is not sent with the requests to get public keys, and
// the reason is that the hash is not stored on the nodes ?
// Idea: We need to also persist the hash on the receiving nodes. We can then load
// that key upon startup, and send it along when we do a public keys get.
err = proc.nodeAuth.publicKeys.saveToFile()
if err != nil {
@ -2221,11 +2218,11 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st
if ok {
func() {
proc.centralAuth.pki.nodeKeysAndHash.mu.Lock()
defer proc.centralAuth.pki.nodeKeysAndHash.mu.Unlock()
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
// Store/update the node and public key on the allowed pubKey map.
proc.centralAuth.pki.nodeKeysAndHash.KeyMap[Node(n)] = key
proc.centralAuth.pki.nodesAcked.keysAndHash.Keys[Node(n)] = key
}()
// Add key to persistent storage.
@ -2242,8 +2239,8 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st
// All new elements are now added, and we can create a new hash
// representing the current keys in the allowed map.
func() {
proc.centralAuth.pki.nodeKeysAndHash.mu.Lock()
defer proc.centralAuth.pki.nodeKeysAndHash.mu.Unlock()
proc.centralAuth.pki.nodesAcked.mu.Lock()
defer proc.centralAuth.pki.nodesAcked.mu.Unlock()
type NodesAndKeys struct {
Node Node
@ -2254,7 +2251,7 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st
sortedNodesAndKeys := []NodesAndKeys{}
// Range the map, and add each k/v to the sorted slice, to be sorted later.
for k, v := range proc.centralAuth.pki.nodeKeysAndHash.KeyMap {
for k, v := range proc.centralAuth.pki.nodesAcked.keysAndHash.Keys {
nk := NodesAndKeys{
Node: k,
Key: v,
@ -2275,12 +2272,24 @@ func (m methodREQPublicKeysAllow) handler(proc process, message Message, node st
if err != nil {
er := fmt.Errorf("error: methodREQPublicKeysAllow, failed to marshal slice, and will not update hash for public keys: %v", err)
proc.errorKernel.errSend(proc, message, er)
log.Printf(" * DEBUG: %v\n", err)
log.Printf(" * DEBUG: %v\n", er)
return
}
proc.centralAuth.pki.nodeKeysAndHash.Hash = sha256.Sum256(b)
// Store the key in the key value map.
hash := sha256.Sum256(b)
proc.centralAuth.pki.nodesAcked.keysAndHash.Hash = hash
// Store the key to the db for persistence.
proc.centralAuth.pki.dbUpdateHash(hash[:])
if err != nil {
er := fmt.Errorf("error: methodREQPublicKeysAllow, failed to store the hash into the db: %v", err)
proc.errorKernel.errSend(proc, message, er)
log.Printf(" * DEBUG: %v\n", er)
return
}
}()

View file

@ -20,6 +20,7 @@ import (
"sync"
"time"
copier "github.com/jinzhu/copier"
bolt "go.etcd.io/bbolt"
)
@ -251,6 +252,19 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// messages to be processed while waiting for the done signal, or if an
// error with an individual message occurs.
go func(v samDBValue) {
// Create a copy of the message that we can use to write to the
// perm store without causing a race since the REQ handler for the
// message might not yet be done when message is written to the
// perm store.
// We also need a copy to be able to remove the data from the message
// when writing it to the store, so we don't mess up to actual data
// that might be in use in the handler.
msgForPermStore := Message{}
copier.Copy(&msgForPermStore, v.Data.Message)
// Remove the content of the data field.
msgForPermStore.Data = nil
v.Data.Message.done = make(chan struct{})
delivredCh := make(chan struct{})
@ -290,7 +304,84 @@ func (r *ringBuffer) processBufferMessages(ctx context.Context, outCh chan samDB
// it out of the K/V Store.
r.deleteKeyFromBucket(r.samValueBucket, strconv.Itoa(v.ID))
r.permStore <- fmt.Sprintf("%v : %+v\n", time.Now().Format("Mon Jan _2 15:04:05 2006"), v)
//m := v.Data.Message
//t := time.Now().Format("Mon Jan _2 15:04:05 2006")
//tmpout := os.Stdout
//_ = fmt.Sprintf("%v\n", t)
//_ = fmt.Sprintf("%v\n", m.ID)
//_ = fmt.Sprintf("%v\n", m.ToNode)
//_ = fmt.Sprintf("%v\n", m.ToNodes)
//_ = fmt.Sprintf("%v\n", m.Data)
//_ = fmt.Sprintf("%v\n", m.Method)
//_ = fmt.Sprintf("%v\n", m.MethodArgs)
//_ = fmt.Sprintf("%v\n", m.ArgSignature)
//_ = fmt.Sprintf("%v\n", m.ReplyMethod)
//_ = fmt.Sprintf("%v\n", m.ReplyMethodArgs)
//_ = fmt.Sprintf("%v\n", m.IsReply)
//_ = fmt.Sprintf("%v\n", m.FromNode)
//_ = fmt.Sprintf("%v\n", m.ACKTimeout)
//_ = fmt.Sprintf("%v\n", m.Retries)
//_ = fmt.Sprintf("%v\n", m.ReplyACKTimeout)
//_ = fmt.Sprintf("%v\n", m.ReplyRetries)
//_ = fmt.Sprintf("%v\n", m.MethodTimeout)
//_ = fmt.Sprintf("%v\n", m.ReplyMethodTimeout)
//_ = fmt.Sprintf("%v\n", m.Directory)
//_ = fmt.Sprintf("%v\n", m.FileName)
//_ = fmt.Sprintf("%v\n", m.PreviousMessage)
//_ = fmt.Sprintf("%v\n", m.RelayViaNode)
//_ = fmt.Sprintf("%v\n", m.RelayOriginalViaNode)
//_ = fmt.Sprintf("%v\n", m.RelayFromNode)
//_ = fmt.Sprintf("%v\n", m.RelayToNode)
//_ = fmt.Sprintf("%v\n", m.RelayOriginalMethod)
//_ = fmt.Sprintf("%v\n", m.RelayReplyMethod)
//_ = fmt.Sprintf("%v\n", m.done)
//str := fmt.Sprintln(
// t,
// m.ID,
// m.ToNode,
// m.ToNodes,
// m.Data,
// m.Method,
// m.MethodArgs,
// m.ArgSignature,
// m.ReplyMethod,
// m.ReplyMethodArgs,
// m.IsReply,
// m.FromNode,
// m.ACKTimeout,
// m.Retries,
// m.ReplyACKTimeout,
// m.ReplyRetries,
// m.MethodTimeout,
// m.ReplyMethodTimeout,
// m.Directory,
// m.FileName,
// m.PreviousMessage,
// m.RelayViaNode,
// m.RelayOriginalViaNode,
// m.RelayFromNode,
// m.RelayToNode,
// m.RelayOriginalMethod,
// m.RelayReplyMethod,
// m.done,
//)
//r.permStore <- fmt.Sprintf("%v\n", str)
// NB: Removed this one since it creates a data race with the storing of the hash value in
// the methodREQPublicKeysToNode. Sorted by splitting up the sprint below with the sprint
// above for now, but should investigate further what might be the case here, since the
// message have no reference to the proc and should in theory not create a race.
//
js, err := json.Marshal(msgForPermStore)
if err != nil {
er := fmt.Errorf("error:fillBuffer: json marshaling: %v", err)
r.errorKernel.errSend(r.processInitial, Message{}, er)
}
r.permStore <- time.Now().Format("Mon Jan _2 15:04:05 2006") + ", " + string(js) + "\n"
}(v)
case <-ctx.Done():