1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-13 19:28:55 +00:00

Clean up RCRs if the count exceeds the threshold (#4148)

* Clean up RCRs if the count exceeds the limit

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Sets reports to inactive on resourceExhausted error

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* fix linter

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Add a container flag changeRequestLimit

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Skip generating RCRs if resourceExhausted error occurs

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* set default RCR limit to 1000

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Update log messages and CHANGELOG.md

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Address review comments

Signed-off-by: ShutingZhao <shuting@nirmata.com>

* Extract mapper to a separate file

Signed-off-by: ShutingZhao <shuting@nirmata.com>
This commit is contained in:
shuting 2022-06-28 14:18:57 +08:00 committed by GitHub
parent cd2d89bf55
commit 77fb10a430
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 291 additions and 56 deletions

View file

@ -1,3 +1,10 @@
## v1.7.2-rc1
### Note
- A new flag `maxReportChangeRequests` is added to the Kyverno main container, this flag sets the up-limit of reportchangerequests that a namespace can take, or clusterreportchangerequests if matching kinds are cluster-wide resources. The default limit is set to 1000, and it's recommended to configure it to a small threshold on large clusters. Here the large clusters are considered that a policy report has more than 1k results.
## v1.7.0-rc1
### Note

View file

@ -69,6 +69,7 @@ var (
allowInsecureRegistry bool
clientRateLimitQPS float64
clientRateLimitBurst int
changeRequestLimit int
webhookRegistrationTimeout time.Duration
setupLog = log.Log.WithName("setup")
)
@ -92,6 +93,8 @@ func main() {
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.Func(toggle.AutogenInternalsFlagName, toggle.AutogenInternalsDescription, toggle.AutogenInternalsFlag)
flag.DurationVar(&webhookRegistrationTimeout, "webhookRegistrationTimeout", 120*time.Second, "Timeout for webhook registration, e.g., 30s, 1m, 5m.")
flag.IntVar(&changeRequestLimit, "maxReportChangeRequests", 1000, "maximum pending report change requests per namespace or for the cluster-wide policy report")
if err := flag.Set("v", "2"); err != nil {
setupLog.Error(err, "failed to set log level")
os.Exit(1)
@ -202,6 +205,7 @@ func main() {
kyvernoV1alpha2.ClusterReportChangeRequests(),
kyvernoV1.ClusterPolicies(),
kyvernoV1.Policies(),
changeRequestLimit,
log.Log.WithName("ReportChangeRequestGenerator"),
)
@ -213,6 +217,7 @@ func main() {
kyvernoV1alpha2.ReportChangeRequests(),
kyvernoV1alpha2.ClusterReportChangeRequests(),
kubeInformer.Core().V1().Namespaces(),
reportReqGen.CleanupChangeRequest,
log.Log.WithName("PolicyReportGenerator"),
)
if err != nil {
@ -447,7 +452,7 @@ func main() {
// start them once by the leader
run := func() {
go certManager.Run(stopCh)
go policyCtrl.Run(2, prgen.ReconcileCh, stopCh)
go policyCtrl.Run(2, prgen.ReconcileCh, reportReqGen.CleanupChangeRequest, stopCh)
go prgen.Run(1, stopCh)
go grcc.Run(1, stopCh)
}

View file

@ -407,7 +407,7 @@ func (pc *PolicyController) enqueuePolicy(policy kyvernov1.PolicyInterface) {
}
// Run begins watching and syncing.
func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, stopCh <-chan struct{}) {
func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, cleanupChangeRequest <-chan policyreport.ReconcileInfo, stopCh <-chan struct{}) {
logger := pc.log
defer utilruntime.HandleCrash()
@ -436,7 +436,7 @@ func (pc *PolicyController) Run(workers int, reconcileCh <-chan bool, stopCh <-c
go wait.Until(pc.worker, time.Second, stopCh)
}
go pc.forceReconciliation(reconcileCh, stopCh)
go pc.forceReconciliation(reconcileCh, cleanupChangeRequest, stopCh)
<-stopCh
}

View file

@ -40,56 +40,95 @@ func (pc *PolicyController) report(engineResponses []*response.EngineResponse, l
}
// forceReconciliation forces a background scan by adding all policies to the workqueue
func (pc *PolicyController) forceReconciliation(reconcileCh <-chan bool, stopCh <-chan struct{}) {
func (pc *PolicyController) forceReconciliation(reconcileCh <-chan bool, cleanupChangeRequest <-chan policyreport.ReconcileInfo, stopCh <-chan struct{}) {
logger := pc.log.WithName("forceReconciliation")
ticker := time.NewTicker(pc.reconcilePeriod)
changeRequestMapperNamespace := make(map[string]bool)
for {
select {
case <-ticker.C:
logger.Info("performing the background scan", "scan interval", pc.reconcilePeriod.String())
if err := pc.policyReportEraser.CleanupReportChangeRequests(cleanupReportChangeRequests); err != nil {
if err := pc.policyReportEraser.CleanupReportChangeRequests(cleanupReportChangeRequests, nil); err != nil {
logger.Error(err, "failed to cleanup report change requests")
}
if err := pc.policyReportEraser.EraseResultsEntries(eraseResultsEntries); err != nil {
if err := pc.policyReportEraser.EraseResultEntries(eraseResultEntries, nil); err != nil {
logger.Error(err, "continue reconciling policy reports")
}
pc.requeuePolicies()
pc.prGenerator.MapperInvalidate()
case erase := <-reconcileCh:
logger.Info("received the reconcile signal, reconciling policy report")
if err := pc.policyReportEraser.CleanupReportChangeRequests(cleanupReportChangeRequests); err != nil {
if err := pc.policyReportEraser.CleanupReportChangeRequests(cleanupReportChangeRequests, nil); err != nil {
logger.Error(err, "failed to cleanup report change requests")
}
if erase {
if err := pc.policyReportEraser.EraseResultsEntries(eraseResultsEntries); err != nil {
if err := pc.policyReportEraser.EraseResultEntries(eraseResultEntries, nil); err != nil {
logger.Error(err, "continue reconciling policy reports")
}
}
pc.requeuePolicies()
case info := <-cleanupChangeRequest:
if info.Namespace == nil {
continue
}
ns := *info.Namespace
if exist := changeRequestMapperNamespace[ns]; exist {
continue
}
changeRequestMapperNamespace[ns] = true
if err := pc.policyReportEraser.CleanupReportChangeRequests(cleanupReportChangeRequests,
map[string]string{policyreport.ResourceLabelNamespace: ns}); err != nil {
logger.Error(err, "failed to cleanup report change requests for the given namespace", "namespace", ns)
} else {
logger.V(3).Info("cleaned up report change requests for the given namespace", "namespace", ns)
}
changeRequestMapperNamespace[ns] = false
if err := pc.policyReportEraser.EraseResultEntries(eraseResultEntries, info.Namespace); err != nil {
logger.Error(err, "failed to erase result entries for the report", "report", policyreport.GeneratePolicyReportName(ns))
} else {
logger.V(3).Info("wiped out result entries for the report", "report", policyreport.GeneratePolicyReportName(ns))
}
if info.MapperInactive {
pc.prGenerator.MapperInactive(ns)
} else {
pc.prGenerator.MapperReset(ns)
}
pc.requeuePolicies()
case <-stopCh:
return
}
}
}
func cleanupReportChangeRequests(pclient kyvernoclient.Interface, rcrLister kyvernov1alpha2listers.ReportChangeRequestLister, crcrLister kyvernov1alpha2listers.ClusterReportChangeRequestLister) error {
func cleanupReportChangeRequests(pclient kyvernoclient.Interface, rcrLister kyvernov1alpha2listers.ReportChangeRequestLister, crcrLister kyvernov1alpha2listers.ClusterReportChangeRequestLister, labels map[string]string) error {
var errors []string
var gracePeriod int64 = 0
deleteOptions := metav1.DeleteOptions{GracePeriodSeconds: &gracePeriod}
err := pclient.KyvernoV1alpha2().ClusterReportChangeRequests().DeleteCollection(context.TODO(), deleteOptions, metav1.ListOptions{})
selector := &metav1.LabelSelector{
MatchLabels: labels,
}
err := pclient.KyvernoV1alpha2().ClusterReportChangeRequests().DeleteCollection(context.TODO(), deleteOptions, metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(selector)})
if err != nil {
errors = append(errors, err.Error())
}
err = pclient.KyvernoV1alpha2().ReportChangeRequests(config.KyvernoNamespace()).DeleteCollection(context.TODO(), deleteOptions, metav1.ListOptions{})
err = pclient.KyvernoV1alpha2().ReportChangeRequests(config.KyvernoNamespace()).DeleteCollection(context.TODO(), deleteOptions, metav1.ListOptions{LabelSelector: metav1.FormatLabelSelector(selector)})
if err != nil {
errors = append(errors, err.Error())
}
@ -101,13 +140,47 @@ func cleanupReportChangeRequests(pclient kyvernoclient.Interface, rcrLister kyve
return fmt.Errorf("%v", strings.Join(errors, ";"))
}
func eraseResultsEntries(pclient kyvernoclient.Interface, reportLister policyreportv1alpha2listers.PolicyReportLister, clusterReportLister policyreportv1alpha2listers.ClusterPolicyReportLister) error {
func eraseResultEntries(pclient kyvernoclient.Interface, reportLister policyreportv1alpha2listers.PolicyReportLister, clusterReportLister policyreportv1alpha2listers.ClusterPolicyReportLister, ns *string) error {
selector, err := metav1.LabelSelectorAsSelector(policyreport.LabelSelector)
if err != nil {
return fmt.Errorf("failed to erase results entries %v", err)
}
var errors []string
var polrName string
if ns != nil {
polrName = policyreport.GeneratePolicyReportName(*ns)
if polrName != "" {
polr, err := reportLister.PolicyReports(*ns).Get(polrName)
if err != nil {
return fmt.Errorf("failed to erase results entries for PolicyReport %s: %v", polrName, err)
}
polr.Results = []v1alpha2.PolicyReportResult{}
polr.Summary = v1alpha2.PolicyReportSummary{}
if _, err = pclient.Wgpolicyk8sV1alpha2().PolicyReports(polr.GetNamespace()).Update(context.TODO(), polr, metav1.UpdateOptions{}); err != nil {
errors = append(errors, fmt.Sprintf("%s/%s/%s: %v", polr.Kind, polr.Namespace, polr.Name, err))
}
} else {
cpolr, err := clusterReportLister.Get(polrName)
if err != nil {
errors = append(errors, err.Error())
}
cpolr.Results = []v1alpha2.PolicyReportResult{}
cpolr.Summary = v1alpha2.PolicyReportSummary{}
if _, err = pclient.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(context.TODO(), cpolr, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to erase results entries for ClusterPolicyReport %s: %v", polrName, err)
}
}
if len(errors) == 0 {
return nil
}
return fmt.Errorf("failed to erase results entries for report %s: %v", polrName, strings.Join(errors, ";"))
}
if polrs, err := reportLister.List(selector); err != nil {
errors = append(errors, err.Error())

View file

@ -27,7 +27,7 @@ const (
appVersion string = "app.kubernetes.io/version"
// the following labels are used to list rcr / crcr
resourceLabelNamespace string = "kyverno.io/resource.namespace"
ResourceLabelNamespace string = "kyverno.io/resource.namespace"
deletedLabelPolicy string = "kyverno.io/delete.policy"
deletedLabelRule string = "kyverno.io/delete.rule"
@ -36,6 +36,9 @@ const (
deletedAnnotationResourceName string = "kyverno.io/delete.resource.name"
deletedAnnotationResourceKind string = "kyverno.io/delete.resource.kind"
inactiveLabelKey string = "kyverno.io/report.status"
inactiveLabelVal string = "inactive"
// SourceValue is the static value for PolicyReportResult.Source
SourceValue = "Kyverno"
)
@ -205,7 +208,7 @@ func set(obj *unstructured.Unstructured, info Info) {
}
obj.SetLabels(map[string]string{
resourceLabelNamespace: info.Namespace,
ResourceLabelNamespace: info.Namespace,
appVersion: version.BuildVersion,
})
}
@ -219,7 +222,7 @@ func setRequestDeletionLabels(req *unstructured.Unstructured, info Info) bool {
})
labels := req.GetLabels()
labels[resourceLabelNamespace] = info.Results[0].Resource.Namespace
labels[ResourceLabelNamespace] = info.Results[0].Resource.Namespace
req.SetLabels(labels)
return true

View file

@ -177,7 +177,7 @@ func (c *changeRequestCreator) mergeRequests() (results []*unstructured.Unstruct
if unstr, ok := c.RCRCache.Get(uid); ok {
if rcr, ok := unstr.(*unstructured.Unstructured); ok {
resourceNS := rcr.GetLabels()[resourceLabelNamespace]
resourceNS := rcr.GetLabels()[ResourceLabelNamespace]
mergedNamespacedRCR, ok := mergedRCR[resourceNS]
if !ok {
mergedNamespacedRCR = &unstructured.Unstructured{}
@ -223,8 +223,8 @@ func (c *changeRequestCreator) mergeRequests() (results []*unstructured.Unstruct
// merge merges elements from a source object into a
// destination object if they share the same namespace label
func merge(dst, src *unstructured.Unstructured) bool {
dstNS := dst.GetLabels()[resourceLabelNamespace]
srcNS := src.GetLabels()[resourceLabelNamespace]
dstNS := dst.GetLabels()[ResourceLabelNamespace]
srcNS := src.GetLabels()[ResourceLabelNamespace]
if dstNS != srcNS {
return false
}

View file

@ -0,0 +1,24 @@
package policyreport
import cmap "github.com/orcaman/concurrent-map"
type concurrentMap struct{ cmap.ConcurrentMap }
func (m concurrentMap) increase(ns string) {
count, ok := m.Get(ns)
if ok && count != -1 {
m.Set(ns, count.(int)+1)
} else {
m.Set(ns, 1)
}
}
func (m concurrentMap) decrease(keyHash string) {
_, ns := parseKeyHash(keyHash)
count, ok := m.Get(ns)
if ok && count.(int) > 0 {
m.Set(ns, count.(int)-1)
} else {
m.Set(ns, 0)
}
}

View file

@ -16,21 +16,21 @@ import (
)
type PolicyReportEraser interface {
CleanupReportChangeRequests(cleanup CleanupReportChangeRequests) error
EraseResultsEntries(erase EraseResultsEntries) error
CleanupReportChangeRequests(cleanup CleanupReportChangeRequests, labels map[string]string) error
EraseResultEntries(erase EraseResultEntries, ns *string) error
}
type (
CleanupReportChangeRequests = func(pclient kyvernoclient.Interface, rcrLister kyvernov1alpha2listers.ReportChangeRequestLister, crcrLister kyvernov1alpha2listers.ClusterReportChangeRequestLister) error
EraseResultsEntries = func(pclient kyvernoclient.Interface, reportLister policyreportv1alpha2listers.PolicyReportLister, clusterReportLister policyreportv1alpha2listers.ClusterPolicyReportLister) error
CleanupReportChangeRequests = func(pclient kyvernoclient.Interface, rcrLister kyvernov1alpha2listers.ReportChangeRequestLister, crcrLister kyvernov1alpha2listers.ClusterReportChangeRequestLister, labels map[string]string) error
EraseResultEntries = func(pclient kyvernoclient.Interface, reportLister policyreportv1alpha2listers.PolicyReportLister, clusterReportLister policyreportv1alpha2listers.ClusterPolicyReportLister, ns *string) error
)
func (g *ReportGenerator) CleanupReportChangeRequests(cleanup CleanupReportChangeRequests) error {
return cleanup(g.pclient, g.reportChangeRequestLister, g.clusterReportChangeRequestLister)
func (g *ReportGenerator) CleanupReportChangeRequests(cleanup CleanupReportChangeRequests, labels map[string]string) error {
return cleanup(g.pclient, g.reportChangeRequestLister, g.clusterReportChangeRequestLister, labels)
}
func (g *ReportGenerator) EraseResultsEntries(erase EraseResultsEntries) error {
return erase(g.pclient, g.reportLister, g.clusterReportLister)
func (g *ReportGenerator) EraseResultEntries(erase EraseResultEntries, ns *string) error {
return erase(g.pclient, g.reportLister, g.clusterReportLister, ns)
}
type deletedResource struct {
@ -52,7 +52,7 @@ func buildLabelForDeletedResource(labels, annotations map[string]string) *delete
return &deletedResource{
kind: kind,
name: name,
ns: labels[resourceLabelNamespace],
ns: labels[ResourceLabelNamespace],
}
}

View file

@ -40,6 +40,10 @@ const (
LabelSelectorKey = "managed-by"
LabelSelectorValue = "kyverno"
deletedPolicyKey = "deletedpolicy"
resourceExhaustedErr = "ResourceExhausted"
)
var LabelSelector = &metav1.LabelSelector{
@ -72,6 +76,8 @@ type ReportGenerator struct {
// if send true, the reports' results will be erased, this is used to recover from the invalid records
ReconcileCh chan bool
cleanupChangeRequest chan<- ReconcileInfo
log logr.Logger
}
@ -84,6 +90,7 @@ func NewReportGenerator(
reportReqInformer kyvernov1alpha2informers.ReportChangeRequestInformer,
clusterReportReqInformer kyvernov1alpha2informers.ClusterReportChangeRequestInformer,
namespace corev1informers.NamespaceInformer,
cleanupChangeRequest chan<- ReconcileInfo,
log logr.Logger,
) (*ReportGenerator, error) {
gen := &ReportGenerator{
@ -95,6 +102,7 @@ func NewReportGenerator(
clusterReportReqInformer: clusterReportReqInformer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), prWorkQueueName),
ReconcileCh: make(chan bool, 10),
cleanupChangeRequest: cleanupChangeRequest,
log: log,
}
@ -108,8 +116,6 @@ func NewReportGenerator(
return gen, nil
}
const deletedPolicyKey string = "deletedpolicy"
// the key of queue can be
// - <namespace name> for the resource
// - "" for cluster wide resource
@ -123,7 +129,7 @@ func generateCacheKey(changeRequest interface{}) string {
return strings.Join([]string{deletedPolicyKey, policy, rule}, "/")
}
ns := label[resourceLabelNamespace]
ns := label[ResourceLabelNamespace]
if ns == "" {
ns = "default"
}
@ -318,18 +324,26 @@ func (g *ReportGenerator) handleErr(err error, key interface{}, aggregatedReques
// otherwise it updates policyReport
func (g *ReportGenerator) syncHandler(key string) (aggregatedRequests interface{}, err error) {
g.log.V(4).Info("syncing policy report", "key", key)
namespace := key
if policy, rule, ok := isDeletedPolicyKey(key); ok {
g.log.V(4).Info("sync policy report on policy deletion")
return g.removePolicyEntryFromReport(policy, rule)
}
namespace := key
new, aggregatedRequests, err := g.aggregateReports(namespace)
if err != nil {
return aggregatedRequests, fmt.Errorf("failed to aggregate reportChangeRequest results %v", err)
}
report, err := g.reportLister.PolicyReports(namespace).Get(GeneratePolicyReportName(namespace))
if err == nil {
if val, ok := report.GetLabels()[inactiveLabelKey]; ok && val == inactiveLabelVal {
g.log.Info("got resourceExhausted error, please opt-in via \"splitPolicyReport\" to generate report per policy")
return aggregatedRequests, nil
}
}
var old interface{}
if old, err = g.createReportIfNotPresent(namespace, new, aggregatedRequests); err != nil {
return aggregatedRequests, err
@ -547,7 +561,7 @@ func (g *ReportGenerator) aggregateReports(namespace string) (
ns.SetDeletionTimestamp(&now)
}
selector := labels.SelectorFromSet(labels.Set(map[string]string{appVersion: version.BuildVersion, resourceLabelNamespace: namespace}))
selector := labels.SelectorFromSet(labels.Set(map[string]string{appVersion: version.BuildVersion, ResourceLabelNamespace: namespace}))
requests, err := g.reportChangeRequestLister.ReportChangeRequests(config.KyvernoNamespace()).List(selector)
if err != nil {
return nil, nil, fmt.Errorf("unable to list reportChangeRequests within namespace %s: %v", ns, err)
@ -701,6 +715,29 @@ func (g *ReportGenerator) updateReport(old interface{}, new *unstructured.Unstru
return fmt.Errorf("error converting to PolicyReport: %v", err)
}
if _, err := g.pclient.Wgpolicyk8sV1alpha2().PolicyReports(new.GetNamespace()).Update(context.TODO(), polr, metav1.UpdateOptions{}); err != nil {
if strings.Contains(err.Error(), resourceExhaustedErr) {
g.log.V(4).Info("got ResourceExhausted error, cleanning up change requests and erasing report results")
annotations := polr.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[inactiveLabelKey] = "Unable to update policy report due to resourceExhausted error, please enable the flag \"splitPolicyReport\" to generate a report per policy"
polr.SetAnnotations(annotations)
labels := polr.GetLabels()
labels[inactiveLabelKey] = inactiveLabelVal
polr.SetLabels(labels)
polr.Results = []policyreportv1alpha2.PolicyReportResult{}
polr.Summary = policyreportv1alpha2.PolicyReportSummary{}
if _, err := g.pclient.Wgpolicyk8sV1alpha2().PolicyReports(new.GetNamespace()).Update(context.TODO(), polr, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to erase policy report results: %v", err)
}
ns := new.GetNamespace()
g.cleanupChangeRequest <- ReconcileInfo{Namespace: &ns, MapperInactive: true}
return nil
}
return fmt.Errorf("failed to update PolicyReport: %v", err)
}
}
@ -711,6 +748,29 @@ func (g *ReportGenerator) updateReport(old interface{}, new *unstructured.Unstru
return fmt.Errorf("error converting to ClusterPolicyReport: %v", err)
}
if _, err := g.pclient.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(context.TODO(), cpolr, metav1.UpdateOptions{}); err != nil {
if strings.Contains(err.Error(), resourceExhaustedErr) {
g.log.V(4).Info("got ResourceExhausted error, cleanning up change requests and erasing report results")
annotations := cpolr.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[inactiveLabelKey] = "Unable to update cluster policy report due to resourceExhausted error, please enable the flag \"splitPolicyReport\" to generate report per policy"
cpolr.SetAnnotations(annotations)
labels := cpolr.GetLabels()
labels[inactiveLabelKey] = inactiveLabelVal
cpolr.SetLabels(labels)
cpolr.Results = []policyreportv1alpha2.PolicyReportResult{}
cpolr.Summary = policyreportv1alpha2.PolicyReportSummary{}
if _, err := g.pclient.Wgpolicyk8sV1alpha2().ClusterPolicyReports().Update(context.TODO(), cpolr, metav1.UpdateOptions{}); err != nil {
return fmt.Errorf("failed to erase cluster policy report results: %v", err)
}
ns := ""
g.cleanupChangeRequest <- ReconcileInfo{Namespace: &ns, MapperInactive: true}
return nil
}
return fmt.Errorf("failed to update ClusterPolicyReport: %v", err)
}
}

View file

@ -17,6 +17,7 @@ import (
kyvernov1alpha2listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha2"
"github.com/kyverno/kyverno/pkg/dclient"
"github.com/kyverno/kyverno/pkg/engine/response"
cmap "github.com/orcaman/concurrent-map"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
@ -36,6 +37,9 @@ type Generator struct {
clusterReportChangeRequestLister kyvernov1alpha2listers.ClusterReportChangeRequestLister
// changeRequestMapper stores the change requests' count per namespace
changeRequestMapper concurrentMap
// cpolLister can list/get policy from the shared informer's store
cpolLister kyvernov1listers.ClusterPolicyLister
@ -49,6 +53,12 @@ type Generator struct {
requestCreator creator
// changeRequestLimit defines the max count for change requests (per namespace for RCR / cluster-wide for CRCR)
changeRequestLimit int
// CleanupChangeRequest signals the policy report controller to cleanup change requests
CleanupChangeRequest chan ReconcileInfo
log logr.Logger
}
@ -59,17 +69,21 @@ func NewReportChangeRequestGenerator(client kyvernoclient.Interface,
clusterReportReqInformer kyvernov1alpha2informers.ClusterReportChangeRequestInformer,
cpolInformer kyvernov1informers.ClusterPolicyInformer,
polInformer kyvernov1informers.PolicyInformer,
changeRequestLimit int,
log logr.Logger,
) *Generator {
gen := Generator{
dclient: dclient,
clusterReportChangeRequestLister: clusterReportReqInformer.Lister(),
reportChangeRequestLister: reportReqInformer.Lister(),
changeRequestMapper: newChangeRequestMapper(),
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
requestCreator: newChangeRequestCreator(client, 3*time.Second, log.WithName("requestCreator")),
changeRequestLimit: changeRequestLimit,
CleanupChangeRequest: make(chan ReconcileInfo, 10),
log: log,
}
@ -77,6 +91,11 @@ func NewReportChangeRequestGenerator(client kyvernoclient.Interface,
return &gen
}
type ReconcileInfo struct {
Namespace *string
MapperInactive bool
}
// NewDataStore returns an instance of data store
func newDataStore() *dataStore {
ds := dataStore{
@ -143,9 +162,17 @@ func (i Info) GetRuleLength() int {
return l
}
func parseKeyHash(keyHash string) (policyName, ns string) {
keys := strings.Split(keyHash, "/")
return keys[0], keys[1]
}
// GeneratorInterface provides API to create PVs
type GeneratorInterface interface {
Add(infos ...Info)
MapperReset(string)
MapperInactive(string)
MapperInvalidate()
}
func (gen *Generator) enqueue(info Info) {
@ -157,10 +184,35 @@ func (gen *Generator) enqueue(info Info) {
// Add queues a policy violation create request
func (gen *Generator) Add(infos ...Info) {
for _, info := range infos {
count, ok := gen.changeRequestMapper.ConcurrentMap.Get(info.Namespace)
if ok && count == -1 {
gen.log.V(6).Info("inactive policy report, skip creating report change request", "namespace", info.Namespace)
continue
}
gen.changeRequestMapper.increase(info.Namespace)
gen.enqueue(info)
}
}
// MapperReset resets the change request mapper for the given namespace
func (gen Generator) MapperReset(ns string) {
gen.changeRequestMapper.ConcurrentMap.Set(ns, 0)
}
// MapperInactive sets the change request mapper for the given namespace to -1
// which indicates the report is inactive
func (gen Generator) MapperInactive(ns string) {
gen.changeRequestMapper.ConcurrentMap.Set(ns, -1)
}
// MapperInvalidate reset map entries
func (gen Generator) MapperInvalidate() {
for ns := range gen.changeRequestMapper.ConcurrentMap.Items() {
gen.changeRequestMapper.ConcurrentMap.Remove(ns)
}
}
// Run starts the workers
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger := gen.log
@ -196,6 +248,7 @@ func (gen *Generator) handleErr(err error, key interface{}) {
if err == nil {
gen.queue.Forget(key)
gen.dataStore.delete(keyHash)
gen.changeRequestMapper.decrease(keyHash)
return
}
@ -209,6 +262,7 @@ func (gen *Generator) handleErr(err error, key interface{}) {
logger.Error(err, "failed to process report request", "key", key)
gen.queue.Forget(key)
gen.dataStore.delete(keyHash)
gen.changeRequestMapper.decrease(keyHash)
}
func (gen *Generator) processNextWorkItem() bool {
@ -218,33 +272,38 @@ func (gen *Generator) processNextWorkItem() bool {
return false
}
err := func(obj interface{}) error {
defer gen.queue.Done(obj)
var keyHash string
var ok bool
defer gen.queue.Done(obj)
var keyHash string
var ok bool
if keyHash, ok = obj.(string); !ok {
gen.queue.Forget(obj)
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
return nil
}
// lookup data store
info := gen.dataStore.lookup(keyHash)
if reflect.DeepEqual(info, Info{}) {
gen.queue.Forget(obj)
logger.V(4).Info("empty key")
return nil
}
err := gen.syncHandler(info)
gen.handleErr(err, obj)
return nil
}(obj)
if err != nil {
logger.Error(err, "failed to process item")
if keyHash, ok = obj.(string); !ok {
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
gen.handleErr(nil, obj)
return true
}
// lookup data store
info := gen.dataStore.lookup(keyHash)
if reflect.DeepEqual(info, Info{}) {
logger.V(4).Info("empty key")
gen.handleErr(nil, obj)
return true
}
count, ok := gen.changeRequestMapper.Get(info.Namespace)
if ok {
if count.(int) > gen.changeRequestLimit {
logger.Info("throttling report change requests", "namespace", info.Namespace, "threshold", gen.changeRequestLimit, "count", count.(int))
gen.CleanupChangeRequest <- ReconcileInfo{Namespace: &(info.Namespace), MapperInactive: false}
gen.queue.Forget(obj)
gen.dataStore.delete(keyHash)
return true
}
}
err := gen.syncHandler(info)
gen.handleErr(err, obj)
return true
}
@ -279,3 +338,7 @@ func hasResultsChanged(old, new map[string]interface{}) bool {
return !reflect.DeepEqual(oldRes, newRes)
}
func newChangeRequestMapper() concurrentMap {
return concurrentMap{cmap.New()}
}