From 35aa3149c85dc609289efa774db9f3451eef4823 Mon Sep 17 00:00:00 2001 From: shuting Date: Mon, 4 Jan 2021 23:17:17 -0800 Subject: [PATCH] Remove lock embedded in CRD controller, use concurrent map to store shcemas (#1441) --- cmd/initContainer/main.go | 1 + go.mod | 1 + go.sum | 1 + pkg/openapi/crdSync.go | 11 ++--- pkg/openapi/validation.go | 81 ++++++++++++++++--------------- pkg/policyreport/reportrequest.go | 2 +- 6 files changed, 51 insertions(+), 46 deletions(-) diff --git a/cmd/initContainer/main.go b/cmd/initContainer/main.go index 515df65d47..6da6cc9f62 100644 --- a/cmd/initContainer/main.go +++ b/cmd/initContainer/main.go @@ -285,6 +285,7 @@ func removePolicyReport(client *client.Client, kind string) error { reportNames := []string{ fmt.Sprintf("policyreport-ns-%s", ns.GetName()), fmt.Sprintf("pr-ns-%s", ns.GetName()), + fmt.Sprintf("polr-ns-%s", ns.GetName()), } var wg sync.WaitGroup diff --git a/go.mod b/go.mod index 5e8e4bb71f..69f68c708e 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/minio/minio v0.0.0-20200114012931-30922148fbb5 github.com/onsi/ginkgo v1.11.0 github.com/onsi/gomega v1.8.1 + github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6 github.com/ory/go-acc v0.2.6 // indirect github.com/pkg/errors v0.9.1 github.com/prometheus/common v0.4.1 diff --git a/go.sum b/go.sum index 307d39c413..aa34b52bd2 100644 --- a/go.sum +++ b/go.sum @@ -545,6 +545,7 @@ github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1Cpa github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/onsi/gomega v1.8.1 h1:C5Dqfs/LeauYDX0jJXIe2SWmwCbGzx9yF8C8xy3Lh34= github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA= +github.com/orcaman/concurrent-map v0.0.0-20190826125027-8c72a8bb44f6/go.mod h1:Lu3tH6HLW3feq74c2GC+jIMS/K2CFcDWnWD9XkenwhI= github.com/ory/go-acc v0.2.6 h1:YfI+L9dxI7QCtWn2RbawqO0vXhiThdXu/RgizJBbaq0= github.com/ory/go-acc v0.2.6/go.mod h1:4Kb/UnPcT8qRAk3IAxta+hvVapdxTLWtrr7bFLlEgpw= github.com/ory/viper v1.7.5 h1:+xVdq7SU3e1vNaCsk/ixsfxE4zylk1TJUiJrY647jUE= diff --git a/pkg/openapi/crdSync.go b/pkg/openapi/crdSync.go index 0b6fdc04c6..31ff868252 100644 --- a/pkg/openapi/crdSync.go +++ b/pkg/openapi/crdSync.go @@ -93,9 +93,6 @@ func (c *crdSync) sync() { return } - c.controller.mutex.Lock() - defer c.controller.mutex.Unlock() - c.controller.deleteCRDFromPreviousSync() for _, crd := range crds.Items { @@ -105,8 +102,8 @@ func (c *crdSync) sync() { func (o *Controller) deleteCRDFromPreviousSync() { for _, crd := range o.crdList { - delete(o.kindToDefinitionName, crd) - delete(o.definitions, crd) + o.kindToDefinitionName.Remove(crd) + o.definitions.Remove(crd) } o.crdList = make([]string, 0) @@ -163,8 +160,8 @@ func (o *Controller) ParseCRD(crd unstructured.Unstructured) { } o.crdList = append(o.crdList, crdName) - o.kindToDefinitionName[crdName] = crdName - o.definitions[crdName] = parsedSchema + o.kindToDefinitionName.Set(crdName, crdName) + o.definitions.Set(crdName, parsedSchema) } func isOpenV3Error(err error) bool { diff --git a/pkg/openapi/validation.go b/pkg/openapi/validation.go index 132bc87f0c..23aac8d27d 100644 --- a/pkg/openapi/validation.go +++ b/pkg/openapi/validation.go @@ -8,39 +8,60 @@ import ( "strings" "sync" - data "github.com/kyverno/kyverno/api" - "github.com/kyverno/kyverno/pkg/engine/utils" - - "github.com/kyverno/kyverno/pkg/engine" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - v1 "github.com/kyverno/kyverno/pkg/api/kyverno/v1" - openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2" "github.com/googleapis/gnostic/compiler" + data "github.com/kyverno/kyverno/api" + v1 "github.com/kyverno/kyverno/pkg/api/kyverno/v1" + "github.com/kyverno/kyverno/pkg/engine" + "github.com/kyverno/kyverno/pkg/engine/utils" + cmap "github.com/orcaman/concurrent-map" + "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/kube-openapi/pkg/util/proto" "k8s.io/kube-openapi/pkg/util/proto/validation" log "sigs.k8s.io/controller-runtime/pkg/log" - - "gopkg.in/yaml.v2" ) +type concurrentMap struct{ cmap.ConcurrentMap } + // Controller represents OpenAPIController type Controller struct { - mutex sync.RWMutex - definitions map[string]*openapi_v2.Schema + // definitions holds the kind - *openapi_v2.Schema map + definitions concurrentMap // kindToDefinitionName holds the kind - definition map // i.e. - Namespace: io.k8s.api.core.v1.Namespace - kindToDefinitionName map[string]string + kindToDefinitionName concurrentMap crdList []string models proto.Models } +func newConcurrentMap() concurrentMap { + return concurrentMap{cmap.New()} +} + +func (m concurrentMap) GetKind(key string) string { + k, ok := m.Get(key) + if !ok { + return "" + } + + return k.(string) +} + +func (m concurrentMap) GetSchema(key string) *openapi_v2.Schema { + k, ok := m.Get(key) + if !ok { + return nil + } + + return k.(*openapi_v2.Schema) +} + // NewOpenAPIController initializes a new instance of OpenAPIController func NewOpenAPIController() (*Controller, error) { controller := &Controller{ - definitions: make(map[string]*openapi_v2.Schema), - kindToDefinitionName: make(map[string]string), + definitions: newConcurrentMap(), + kindToDefinitionName: newConcurrentMap(), } defaultDoc, err := getSchemaDocument() @@ -58,8 +79,6 @@ func NewOpenAPIController() (*Controller, error) { // ValidatePolicyFields ... func (o *Controller) ValidatePolicyFields(policyRaw []byte) error { - o.mutex.RLock() - defer o.mutex.RUnlock() var policy v1.ClusterPolicy err := json.Unmarshal(policyRaw, &policy) @@ -82,11 +101,9 @@ func (o *Controller) ValidatePolicyFields(policyRaw []byte) error { // ValidateResource ... func (o *Controller) ValidateResource(patchedResource unstructured.Unstructured, kind string) error { - o.mutex.RLock() - defer o.mutex.RUnlock() var err error - kind = o.kindToDefinitionName[kind] + kind = o.kindToDefinitionName.GetKind(kind) schema := o.models.LookupModel(kind) if schema == nil { // Check if kind is a CRD @@ -109,18 +126,8 @@ func (o *Controller) ValidateResource(patchedResource unstructured.Unstructured, return nil } -// GetDefinitionNameFromKind ... -func (o *Controller) GetDefinitionNameFromKind(kind string) string { - o.mutex.RLock() - defer o.mutex.RUnlock() - return o.kindToDefinitionName[kind] -} - // ValidatePolicyMutation ... func (o *Controller) ValidatePolicyMutation(policy v1.ClusterPolicy) error { - o.mutex.RLock() - defer o.mutex.RUnlock() - var kindToRules = make(map[string][]v1.Rule) for _, rule := range policy.Spec.Rules { if rule.HasMutate() { @@ -133,7 +140,8 @@ func (o *Controller) ValidatePolicyMutation(policy v1.ClusterPolicy) error { for kind, rules := range kindToRules { newPolicy := *policy.DeepCopy() newPolicy.Spec.Rules = rules - resource, _ := o.generateEmptyResource(o.definitions[o.kindToDefinitionName[kind]]).(map[string]interface{}) + k := o.kindToDefinitionName.GetKind(kind) + resource, _ := o.generateEmptyResource(o.definitions.GetSchema(k)).(map[string]interface{}) if resource == nil || len(resource) == 0 { log.Log.V(2).Info("unable to validate resource. OpenApi definition not found", "kind", kind) return nil @@ -157,13 +165,10 @@ func (o *Controller) ValidatePolicyMutation(policy v1.ClusterPolicy) error { } func (o *Controller) useOpenAPIDocument(doc *openapi_v2.Document) error { - o.mutex.Lock() - defer o.mutex.Unlock() - for _, definition := range doc.GetDefinitions().AdditionalProperties { - o.definitions[definition.GetName()] = definition.GetValue() + o.definitions.Set(definition.GetName(), definition.GetValue()) path := strings.Split(definition.GetName(), ".") - o.kindToDefinitionName[path[len(path)-1]] = definition.GetName() + o.kindToDefinitionName.Set(path[len(path)-1], definition.GetName()) } var err error @@ -192,7 +197,7 @@ func (o *Controller) getCRDSchema(kind string) (proto.Schema, error) { } path := proto.NewPath(kind) - definition := o.definitions[kind] + definition := o.definitions.GetSchema(kind) if definition == nil { return nil, errors.New("could not find definition") } @@ -211,7 +216,7 @@ func (o *Controller) generateEmptyResource(kindSchema *openapi_v2.Schema) interf types := kindSchema.GetType().GetValue() if kindSchema.GetXRef() != "" { - return o.generateEmptyResource(o.definitions[strings.TrimPrefix(kindSchema.GetXRef(), "#/definitions/")]) + return o.generateEmptyResource(o.definitions.GetSchema(strings.TrimPrefix(kindSchema.GetXRef(), "#/definitions/"))) } if len(types) != 1 { diff --git a/pkg/policyreport/reportrequest.go b/pkg/policyreport/reportrequest.go index 6a7a5435c3..5a7864b0f0 100755 --- a/pkg/policyreport/reportrequest.go +++ b/pkg/policyreport/reportrequest.go @@ -259,7 +259,7 @@ func (gen *Generator) processNextWorkItem() bool { } func (gen *Generator) syncHandler(info Info) error { - gen.log.V(3).Info("generating report change request") + gen.log.V(4).Info("reconcile report change request") builder := NewBuilder(gen.cpolLister, gen.polLister) rcrUnstructured, err := builder.build(info)