From e516fb868e897ea430eba12b96e76ed6a61cad2f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Tue, 13 Sep 2022 10:30:14 +0200 Subject: [PATCH] fix: lock in policy report mapper (#4601) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Charles-Edouard Brétéché Signed-off-by: Charles-Edouard Brétéché --- pkg/policyreport/mapper.go | 4 ++++ pkg/policyreport/reportrequest.go | 17 +++++++++++------ 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/pkg/policyreport/mapper.go b/pkg/policyreport/mapper.go index 4ac9ff4bbb..ef182a2c36 100644 --- a/pkg/policyreport/mapper.go +++ b/pkg/policyreport/mapper.go @@ -22,3 +22,7 @@ func (m concurrentMap) decrease(keyHash string) { m.Set(ns, 0) } } + +func newConcurrentMap() concurrentMap { + return concurrentMap{cmap.New()} +} diff --git a/pkg/policyreport/reportrequest.go b/pkg/policyreport/reportrequest.go index 6d79049f01..bf65a18f95 100644 --- a/pkg/policyreport/reportrequest.go +++ b/pkg/policyreport/reportrequest.go @@ -13,7 +13,6 @@ import ( kyvernov1alpha2informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha2" kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1" kyvernov1alpha2listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha2" - cmap "github.com/orcaman/concurrent-map" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -33,6 +32,7 @@ type Generator struct { // changeRequestMapper stores the change requests' count per namespace changeRequestMapper concurrentMap + mutex *sync.RWMutex // cpolLister can list/get policy from the shared informer's store cpolLister kyvernov1listers.ClusterPolicyLister @@ -68,7 +68,8 @@ func NewReportChangeRequestGenerator(client versioned.Interface, gen := Generator{ clusterReportChangeRequestLister: clusterReportReqInformer.Lister(), reportChangeRequestLister: reportReqInformer.Lister(), - changeRequestMapper: newChangeRequestMapper(), + changeRequestMapper: newConcurrentMap(), + mutex: &sync.RWMutex{}, cpolLister: cpolInformer.Lister(), polLister: polInformer.Lister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName), @@ -141,6 +142,8 @@ func (gen *Generator) enqueue(info Info) { // Add queues a policy violation create request func (gen *Generator) Add(infos ...Info) { + gen.mutex.Lock() + defer gen.mutex.Unlock() for _, info := range infos { count, ok := gen.changeRequestMapper.ConcurrentMap.Get(info.Namespace) if ok && count == -1 { @@ -155,17 +158,23 @@ func (gen *Generator) Add(infos ...Info) { // MapperReset resets the change request mapper for the given namespace func (gen Generator) MapperReset(ns string) { + gen.mutex.Lock() + defer gen.mutex.Unlock() gen.changeRequestMapper.ConcurrentMap.Set(ns, 0) } // MapperInactive sets the change request mapper for the given namespace to -1 // which indicates the report is inactive func (gen Generator) MapperInactive(ns string) { + gen.mutex.Lock() + defer gen.mutex.Unlock() gen.changeRequestMapper.ConcurrentMap.Set(ns, -1) } // MapperInvalidate reset map entries func (gen Generator) MapperInvalidate() { + gen.mutex.Lock() + defer gen.mutex.Unlock() for ns := range gen.changeRequestMapper.ConcurrentMap.Items() { gen.changeRequestMapper.ConcurrentMap.Remove(ns) } @@ -296,7 +305,3 @@ func hasResultsChanged(old, new map[string]interface{}) bool { return !reflect.DeepEqual(oldRes, newRes) } - -func newChangeRequestMapper() concurrentMap { - return concurrentMap{cmap.New()} -}