mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-29 10:55:05 +00:00
527 optimised lock implementation
This commit is contained in:
parent
9656975b5a
commit
2c3931a671
3 changed files with 96 additions and 11 deletions
27
pkg/policystatus/keyToMutex.go
Normal file
27
pkg/policystatus/keyToMutex.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package policystatus
|
||||
|
||||
import "sync"
|
||||
|
||||
type keyToMutex struct {
|
||||
mu sync.RWMutex
|
||||
keyMu map[string]*sync.RWMutex
|
||||
}
|
||||
|
||||
func newKeyToMutex() *keyToMutex {
|
||||
return &keyToMutex{
|
||||
mu: sync.RWMutex{},
|
||||
keyMu: make(map[string]*sync.RWMutex),
|
||||
}
|
||||
}
|
||||
|
||||
func (k *keyToMutex) Get(key string) *sync.RWMutex {
|
||||
k.mu.Lock()
|
||||
defer k.mu.Unlock()
|
||||
mutex := k.keyMu[key]
|
||||
if mutex == nil {
|
||||
mutex = &sync.RWMutex{}
|
||||
k.keyMu[key] = mutex
|
||||
}
|
||||
|
||||
return mutex
|
||||
}
|
|
@ -36,15 +36,17 @@ type Sync struct {
|
|||
}
|
||||
|
||||
type cache struct {
|
||||
mutex sync.RWMutex
|
||||
data map[string]v1.PolicyStatus
|
||||
dataMu sync.RWMutex
|
||||
data map[string]v1.PolicyStatus
|
||||
keyToMutex *keyToMutex
|
||||
}
|
||||
|
||||
func NewSync(c *versioned.Clientset, p policyStore) *Sync {
|
||||
return &Sync{
|
||||
cache: &cache{
|
||||
mutex: sync.RWMutex{},
|
||||
data: make(map[string]v1.PolicyStatus),
|
||||
dataMu: sync.RWMutex{},
|
||||
data: make(map[string]v1.PolicyStatus),
|
||||
keyToMutex: newKeyToMutex(),
|
||||
},
|
||||
client: c,
|
||||
policyStore: p,
|
||||
|
@ -65,9 +67,11 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
|
|||
for {
|
||||
select {
|
||||
case statusUpdater := <-s.Listener:
|
||||
s.cache.mutex.Lock()
|
||||
s.cache.keyToMutex.Get(statusUpdater.PolicyName()).Lock()
|
||||
|
||||
s.cache.dataMu.RLock()
|
||||
status, exist := s.cache.data[statusUpdater.PolicyName()]
|
||||
s.cache.dataMu.RUnlock()
|
||||
if !exist {
|
||||
policy, _ := s.policyStore.Get(statusUpdater.PolicyName())
|
||||
if policy != nil {
|
||||
|
@ -75,9 +79,13 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
|
|||
}
|
||||
}
|
||||
|
||||
s.cache.data[statusUpdater.PolicyName()] = statusUpdater.UpdateStatus(status)
|
||||
updatedStatus := statusUpdater.UpdateStatus(status)
|
||||
|
||||
s.cache.mutex.Unlock()
|
||||
s.cache.dataMu.Lock()
|
||||
s.cache.data[statusUpdater.PolicyName()] = updatedStatus
|
||||
s.cache.dataMu.Unlock()
|
||||
|
||||
s.cache.keyToMutex.Get(statusUpdater.PolicyName()).Unlock()
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
|
@ -85,12 +93,12 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
|
|||
}
|
||||
|
||||
func (s *Sync) updatePolicyStatus() {
|
||||
s.cache.mutex.Lock()
|
||||
s.cache.dataMu.Lock()
|
||||
var nameToStatus = make(map[string]v1.PolicyStatus, len(s.cache.data))
|
||||
for k, v := range s.cache.data {
|
||||
nameToStatus[k] = v
|
||||
}
|
||||
s.cache.mutex.Unlock()
|
||||
s.cache.dataMu.Unlock()
|
||||
|
||||
for policyName, status := range nameToStatus {
|
||||
policy, err := s.policyStore.Get(policyName)
|
||||
|
@ -100,9 +108,9 @@ func (s *Sync) updatePolicyStatus() {
|
|||
policy.Status = status
|
||||
_, err = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy)
|
||||
if err != nil {
|
||||
s.cache.mutex.Lock()
|
||||
s.cache.dataMu.Lock()
|
||||
delete(s.cache.data, policyName)
|
||||
s.cache.mutex.Unlock()
|
||||
s.cache.dataMu.Unlock()
|
||||
glog.V(4).Info(err)
|
||||
}
|
||||
}
|
||||
|
|
50
pkg/policystatus/status_test.go
Normal file
50
pkg/policystatus/status_test.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package policystatus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
|
||||
)
|
||||
|
||||
type dummyStore struct {
|
||||
}
|
||||
|
||||
func (d dummyStore) Get(policyName string) (*v1.ClusterPolicy, error) {
|
||||
return &v1.ClusterPolicy{}, nil
|
||||
}
|
||||
|
||||
type dummyStatusUpdater struct {
|
||||
}
|
||||
|
||||
func (d dummyStatusUpdater) UpdateStatus(status v1.PolicyStatus) v1.PolicyStatus {
|
||||
status.RulesAppliedCount++
|
||||
return status
|
||||
}
|
||||
|
||||
func (d dummyStatusUpdater) PolicyName() string {
|
||||
return "policy1"
|
||||
}
|
||||
|
||||
func TestKeyToMutex(t *testing.T) {
|
||||
expectedCache := `{"policy1":{"averageExecutionTime":"","rulesAppliedCount":100}}`
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
s := NewSync(nil, dummyStore{})
|
||||
for i := 0; i < 100; i++ {
|
||||
go s.updateStatusCache(stopCh)
|
||||
}
|
||||
|
||||
for i := 0; i < 100; i++ {
|
||||
go s.Listener.Send(dummyStatusUpdater{})
|
||||
}
|
||||
|
||||
<-time.After(time.Second * 3)
|
||||
stopCh <- struct{}{}
|
||||
|
||||
cacheRaw, _ := json.Marshal(s.cache.data)
|
||||
if string(cacheRaw) != expectedCache {
|
||||
t.Errorf("\nTestcase Failed\nGot:\n%v\nExpected:\n%v\n", string(cacheRaw), expectedCache)
|
||||
}
|
||||
}
|
Loading…
Add table
Reference in a new issue