From 2c3931a671d20edf491a34926c1481ba1062a016 Mon Sep 17 00:00:00 2001 From: shravan Date: Sat, 7 Mar 2020 14:56:42 +0530 Subject: [PATCH] 527 optimised lock implementation --- pkg/policystatus/keyToMutex.go | 27 ++++++++++++++++++ pkg/policystatus/main.go | 30 ++++++++++++-------- pkg/policystatus/status_test.go | 50 +++++++++++++++++++++++++++++++++ 3 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 pkg/policystatus/keyToMutex.go create mode 100644 pkg/policystatus/status_test.go diff --git a/pkg/policystatus/keyToMutex.go b/pkg/policystatus/keyToMutex.go new file mode 100644 index 0000000000..a7b533a0e6 --- /dev/null +++ b/pkg/policystatus/keyToMutex.go @@ -0,0 +1,27 @@ +package policystatus + +import "sync" + +type keyToMutex struct { + mu sync.RWMutex + keyMu map[string]*sync.RWMutex +} + +func newKeyToMutex() *keyToMutex { + return &keyToMutex{ + mu: sync.RWMutex{}, + keyMu: make(map[string]*sync.RWMutex), + } +} + +func (k *keyToMutex) Get(key string) *sync.RWMutex { + k.mu.Lock() + defer k.mu.Unlock() + mutex := k.keyMu[key] + if mutex == nil { + mutex = &sync.RWMutex{} + k.keyMu[key] = mutex + } + + return mutex +} diff --git a/pkg/policystatus/main.go b/pkg/policystatus/main.go index ff3caaa7cc..cf99d45f71 100644 --- a/pkg/policystatus/main.go +++ b/pkg/policystatus/main.go @@ -36,15 +36,17 @@ type Sync struct { } type cache struct { - mutex sync.RWMutex - data map[string]v1.PolicyStatus + dataMu sync.RWMutex + data map[string]v1.PolicyStatus + keyToMutex *keyToMutex } func NewSync(c *versioned.Clientset, p policyStore) *Sync { return &Sync{ cache: &cache{ - mutex: sync.RWMutex{}, - data: make(map[string]v1.PolicyStatus), + dataMu: sync.RWMutex{}, + data: make(map[string]v1.PolicyStatus), + keyToMutex: newKeyToMutex(), }, client: c, policyStore: p, @@ -65,9 +67,11 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) { for { select { case statusUpdater := <-s.Listener: - s.cache.mutex.Lock() + s.cache.keyToMutex.Get(statusUpdater.PolicyName()).Lock() + s.cache.dataMu.RLock() status, exist := s.cache.data[statusUpdater.PolicyName()] + s.cache.dataMu.RUnlock() if !exist { policy, _ := s.policyStore.Get(statusUpdater.PolicyName()) if policy != nil { @@ -75,9 +79,13 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) { } } - s.cache.data[statusUpdater.PolicyName()] = statusUpdater.UpdateStatus(status) + updatedStatus := statusUpdater.UpdateStatus(status) - s.cache.mutex.Unlock() + s.cache.dataMu.Lock() + s.cache.data[statusUpdater.PolicyName()] = updatedStatus + s.cache.dataMu.Unlock() + + s.cache.keyToMutex.Get(statusUpdater.PolicyName()).Unlock() case <-stopCh: return } @@ -85,12 +93,12 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) { } func (s *Sync) updatePolicyStatus() { - s.cache.mutex.Lock() + s.cache.dataMu.Lock() var nameToStatus = make(map[string]v1.PolicyStatus, len(s.cache.data)) for k, v := range s.cache.data { nameToStatus[k] = v } - s.cache.mutex.Unlock() + s.cache.dataMu.Unlock() for policyName, status := range nameToStatus { policy, err := s.policyStore.Get(policyName) @@ -100,9 +108,9 @@ func (s *Sync) updatePolicyStatus() { policy.Status = status _, err = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy) if err != nil { - s.cache.mutex.Lock() + s.cache.dataMu.Lock() delete(s.cache.data, policyName) - s.cache.mutex.Unlock() + s.cache.dataMu.Unlock() glog.V(4).Info(err) } } diff --git a/pkg/policystatus/status_test.go b/pkg/policystatus/status_test.go new file mode 100644 index 0000000000..310852cf2f --- /dev/null +++ b/pkg/policystatus/status_test.go @@ -0,0 +1,50 @@ +package policystatus + +import ( + "encoding/json" + "testing" + "time" + + v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1" +) + +type dummyStore struct { +} + +func (d dummyStore) Get(policyName string) (*v1.ClusterPolicy, error) { + return &v1.ClusterPolicy{}, nil +} + +type dummyStatusUpdater struct { +} + +func (d dummyStatusUpdater) UpdateStatus(status v1.PolicyStatus) v1.PolicyStatus { + status.RulesAppliedCount++ + return status +} + +func (d dummyStatusUpdater) PolicyName() string { + return "policy1" +} + +func TestKeyToMutex(t *testing.T) { + expectedCache := `{"policy1":{"averageExecutionTime":"","rulesAppliedCount":100}}` + + stopCh := make(chan struct{}) + s := NewSync(nil, dummyStore{}) + for i := 0; i < 100; i++ { + go s.updateStatusCache(stopCh) + } + + for i := 0; i < 100; i++ { + go s.Listener.Send(dummyStatusUpdater{}) + } + + <-time.After(time.Second * 3) + stopCh <- struct{}{} + + cacheRaw, _ := json.Marshal(s.cache.data) + if string(cacheRaw) != expectedCache { + t.Errorf("\nTestcase Failed\nGot:\n%v\nExpected:\n%v\n", string(cacheRaw), expectedCache) + } +}