From b741187a7332cabf53c5de6b6df8b371a17ed118 Mon Sep 17 00:00:00 2001
From: Mariam Fahmy
Date: Tue, 18 Jul 2023 13:01:09 +0300
Subject: [PATCH] move events for cleanup policies to the events controller
(#7827)
Signed-off-by: Mariam Fahmy
---
.../v2alpha1/cleanup_policy_interface.go | 3 +-
api/kyverno/v2alpha1/cleanup_policy_types.go | 10 ++++
.../handlers/cleanup/handlers.go | 46 +++----------------
cmd/cleanup-controller/main.go | 21 +++++++--
docs/user/crd/index.html | 2 +-
pkg/event/controller.go | 43 +++++++++++++++++
pkg/event/events.go | 30 ++++++++++++
7 files changed, 111 insertions(+), 44 deletions(-)
diff --git a/api/kyverno/v2alpha1/cleanup_policy_interface.go b/api/kyverno/v2alpha1/cleanup_policy_interface.go
index 48f4b002ce..7aa5bbd277 100644
--- a/api/kyverno/v2alpha1/cleanup_policy_interface.go
+++ b/api/kyverno/v2alpha1/cleanup_policy_interface.go
@@ -6,10 +6,11 @@ import (
"k8s.io/apimachinery/pkg/util/validation/field"
)
-// CleanupPolicyInterface abstracts the concrete policy type (Policy vs ClusterPolicy)
+// CleanupPolicyInterface abstracts the concrete policy type (CleanupPolicy vs ClusterCleanupPolicy)
// +kubebuilder:object:generate=false
type CleanupPolicyInterface interface {
metav1.Object
+ IsNamespaced() bool
GetSpec() *CleanupPolicySpec
GetStatus() *CleanupPolicyStatus
Validate(sets.Set[string]) field.ErrorList
diff --git a/api/kyverno/v2alpha1/cleanup_policy_types.go b/api/kyverno/v2alpha1/cleanup_policy_types.go
index d33c2aada6..93150c8a8a 100644
--- a/api/kyverno/v2alpha1/cleanup_policy_types.go
+++ b/api/kyverno/v2alpha1/cleanup_policy_types.go
@@ -75,6 +75,11 @@ func (p *CleanupPolicy) GetAPIVersion() string {
return p.APIVersion
}
+// IsNamespaced indicates if the policy is namespace scoped
+func (p *CleanupPolicy) IsNamespaced() bool {
+ return true
+}
+
// +kubebuilder:object:root=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@@ -128,6 +133,11 @@ func (p *ClusterCleanupPolicy) GetAPIVersion() string {
return p.APIVersion
}
+// IsNamespaced indicates if the policy is namespace scoped
+func (p *ClusterCleanupPolicy) IsNamespaced() bool {
+ return false
+}
+
// Validate implements programmatic validation
func (p *ClusterCleanupPolicy) Validate(clusterResources sets.Set[string]) (errs field.ErrorList) {
errs = append(errs, kyvernov1.ValidatePolicyName(field.NewPath("metadata").Child("name"), p.Name)...)
diff --git a/cmd/cleanup-controller/handlers/cleanup/handlers.go b/cmd/cleanup-controller/handlers/cleanup/handlers.go
index eb4f0d38b0..87d77ae18b 100644
--- a/cmd/cleanup-controller/handlers/cleanup/handlers.go
+++ b/cmd/cleanup-controller/handlers/cleanup/handlers.go
@@ -23,13 +23,9 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.uber.org/multierr"
- corev1 "k8s.io/api/core/v1"
- "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
- "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
- "k8s.io/client-go/tools/record"
)
type handlers struct {
@@ -38,7 +34,7 @@ type handlers struct {
polLister kyvernov2alpha1listers.CleanupPolicyLister
nsLister corev1listers.NamespaceLister
cmResolver engineapi.ConfigmapResolver
- recorder record.EventRecorder
+ eventGen event.Interface
jp jmespath.Interface
metrics cleanupMetrics
}
@@ -78,6 +74,7 @@ func New(
nsLister corev1listers.NamespaceLister,
cmResolver engineapi.ConfigmapResolver,
jp jmespath.Interface,
+ eventGen event.Interface,
) *handlers {
return &handlers{
client: client,
@@ -85,7 +82,7 @@ func New(
polLister: polLister,
nsLister: nsLister,
cmResolver: cmResolver,
- recorder: event.NewRecorder(event.CleanupController, client.GetEventsInterface()),
+ eventGen: eventGen,
metrics: newCleanupMetrics(logger),
jp: jp,
}
@@ -247,13 +244,15 @@ func (h *handlers) executePolicy(
}
debug.Error(err, "failed to delete resource")
errs = append(errs, err)
- h.createEvent(policy, resource, err)
+ e := event.NewCleanupPolicyEvent(policy, resource, err)
+ h.eventGen.Add(e)
} else {
if h.metrics.deletedObjectsTotal != nil {
h.metrics.deletedObjectsTotal.Add(ctx, 1, metric.WithAttributes(labels...))
}
debug.Info("deleted")
- h.createEvent(policy, resource, nil)
+ e := event.NewCleanupPolicyEvent(policy, resource, nil)
+ h.eventGen.Add(e)
}
}
}
@@ -261,34 +260,3 @@ func (h *handlers) executePolicy(
}
return multierr.Combine(errs...)
}
-
-func (h *handlers) createEvent(policy kyvernov2alpha1.CleanupPolicyInterface, resource unstructured.Unstructured, err error) {
- var cleanuppol runtime.Object
- if policy.GetNamespace() == "" {
- cleanuppol = policy.(*kyvernov2alpha1.ClusterCleanupPolicy)
- } else if policy.GetNamespace() != "" {
- cleanuppol = policy.(*kyvernov2alpha1.CleanupPolicy)
- }
- if err == nil {
- h.recorder.Eventf(
- cleanuppol,
- corev1.EventTypeNormal,
- string(event.PolicyApplied),
- "successfully cleaned up the target resource %v/%v/%v",
- resource.GetKind(),
- resource.GetNamespace(),
- resource.GetName(),
- )
- } else {
- h.recorder.Eventf(
- cleanuppol,
- corev1.EventTypeWarning,
- string(event.PolicyError),
- "failed to clean up the target resource %v/%v/%v: %v",
- resource.GetKind(),
- resource.GetNamespace(),
- resource.GetName(),
- err.Error(),
- )
- }
-}
diff --git a/cmd/cleanup-controller/main.go b/cmd/cleanup-controller/main.go
index 7f27f9db0d..8d214a2cca 100644
--- a/cmd/cleanup-controller/main.go
+++ b/cmd/cleanup-controller/main.go
@@ -17,8 +17,10 @@ import (
"github.com/kyverno/kyverno/pkg/controllers/cleanup"
genericloggingcontroller "github.com/kyverno/kyverno/pkg/controllers/generic/logging"
genericwebhookcontroller "github.com/kyverno/kyverno/pkg/controllers/generic/webhook"
+ "github.com/kyverno/kyverno/pkg/event"
"github.com/kyverno/kyverno/pkg/informers"
"github.com/kyverno/kyverno/pkg/leaderelection"
+ "github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/tls"
"github.com/kyverno/kyverno/pkg/webhooks"
admissionregistrationv1 "k8s.io/api/admissionregistration/v1"
@@ -49,14 +51,16 @@ func (probes) IsLive(context.Context) bool {
func main() {
var (
- dumpPayload bool
- serverIP string
- servicePort int
+ dumpPayload bool
+ serverIP string
+ servicePort int
+ maxQueuedEvents int
)
flagset := flag.NewFlagSet("cleanup-controller", flag.ExitOnError)
flagset.BoolVar(&dumpPayload, "dumpPayload", false, "Set this flag to activate/deactivate debug mode.")
flagset.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flagset.IntVar(&servicePort, "servicePort", 443, "Port used by the Kyverno Service resource and for webhook configurations.")
+ flagset.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
// config
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
@@ -193,10 +197,20 @@ func main() {
kyvernoInformer.Kyverno().V2alpha1().ClusterCleanupPolicies(),
genericloggingcontroller.CheckGeneration,
)
+ eventGenerator := event.NewEventCleanupGenerator(
+ setup.KyvernoDynamicClient,
+ kyvernoInformer.Kyverno().V2alpha1().ClusterCleanupPolicies(),
+ kyvernoInformer.Kyverno().V2alpha1().CleanupPolicies(),
+ maxQueuedEvents,
+ logging.WithName("EventGenerator"),
+ )
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kubeInformer, kyvernoInformer) {
os.Exit(1)
}
+ // start event generator
+ var wg sync.WaitGroup
+ go eventGenerator.Run(ctx, 3, &wg)
// create handlers
admissionHandlers := admissionhandlers.New(setup.KyvernoDynamicClient)
cmResolver := internal.NewConfigMapResolver(ctx, setup.Logger, setup.KubeClient, resyncPeriod)
@@ -208,6 +222,7 @@ func main() {
nsLister,
cmResolver,
setup.Jp,
+ eventGenerator,
)
// create server
server := NewServer(
diff --git a/docs/user/crd/index.html b/docs/user/crd/index.html
index 004ca18a4f..b65ac614da 100644
--- a/docs/user/crd/index.html
+++ b/docs/user/crd/index.html
@@ -5602,7 +5602,7 @@ MatchResources
CleanupPolicyInterface
-
CleanupPolicyInterface abstracts the concrete policy type (Policy vs ClusterPolicy)
+CleanupPolicyInterface abstracts the concrete policy type (CleanupPolicy vs ClusterCleanupPolicy)
CleanupPolicySpec
diff --git a/pkg/event/controller.go b/pkg/event/controller.go
index 1506a926bc..cca8930a59 100644
--- a/pkg/event/controller.go
+++ b/pkg/event/controller.go
@@ -7,7 +7,9 @@ import (
"github.com/go-logr/logr"
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
+ kyvernov2alpha1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v2alpha1"
kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
+ kyvernov2alpha1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v2alpha1"
"github.com/kyverno/kyverno/pkg/clients/dclient"
corev1 "k8s.io/api/core/v1"
errors "k8s.io/apimachinery/pkg/api/errors"
@@ -30,6 +32,10 @@ type generator struct {
cpLister kyvernov1listers.ClusterPolicyLister
// list/get policy
pLister kyvernov1listers.PolicyLister
+ // list/get cluster cleanup policy
+ clustercleanuppolLister kyvernov2alpha1listers.ClusterCleanupPolicyLister
+ // list/get cleanup policy
+ cleanuppolLister kyvernov2alpha1listers.CleanupPolicyLister
// queue to store event generation requests
queue workqueue.RateLimitingInterface
// events generated at policy controller
@@ -40,6 +46,8 @@ type generator struct {
genPolicyRecorder record.EventRecorder
// events generated at mutateExisting controller
mutateExistingRecorder record.EventRecorder
+ // events generated at cleanup controller
+ cleanupPolicyRecorder record.EventRecorder
maxQueuedEvents int
@@ -85,6 +93,27 @@ func NewEventGenerator(
return &gen
}
+// NewEventGenerator to generate a new event cleanup controller
+func NewEventCleanupGenerator(
+ // source Source,
+ client dclient.Interface,
+ clustercleanuppolInformer kyvernov2alpha1informers.ClusterCleanupPolicyInformer,
+ cleanuppolInformer kyvernov2alpha1informers.CleanupPolicyInformer,
+ maxQueuedEvents int,
+ log logr.Logger,
+) Controller {
+ gen := generator{
+ client: client,
+ clustercleanuppolLister: clustercleanuppolInformer.Lister(),
+ cleanuppolLister: cleanuppolInformer.Lister(),
+ queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultItemBasedRateLimiter(), eventWorkQueueName),
+ cleanupPolicyRecorder: NewRecorder(CleanupController, client.GetEventsInterface()),
+ maxQueuedEvents: maxQueuedEvents,
+ log: log,
+ }
+ return &gen
+}
+
// Add queues an event for generation
func (gen *generator) Add(infos ...Info) {
logger := gen.log
@@ -193,6 +222,18 @@ func (gen *generator) syncHandler(key Info) error {
logger.Error(err, "failed to get policy", "name", key.Name)
return err
}
+ case "ClusterCleanupPolicy":
+ robj, err = gen.clustercleanuppolLister.Get(key.Name)
+ if err != nil {
+ logger.Error(err, "failed to get cluster clean up policy", "name", key.Name)
+ return err
+ }
+ case "CleanupPolicy":
+ robj, err = gen.cleanuppolLister.CleanupPolicies(key.Namespace).Get(key.Name)
+ if err != nil {
+ logger.Error(err, "failed to get cleanup policy", "name", key.Name)
+ return err
+ }
default:
robj, err = gen.client.GetResource(context.TODO(), "", key.Kind, key.Namespace, key.Name)
if err != nil {
@@ -223,6 +264,8 @@ func (gen *generator) syncHandler(key Info) error {
gen.genPolicyRecorder.Event(robj, eventType, string(key.Reason), key.Message)
case MutateExistingController:
gen.mutateExistingRecorder.Event(robj, eventType, string(key.Reason), key.Message)
+ case CleanupController:
+ gen.cleanupPolicyRecorder.Event(robj, eventType, string(key.Reason), key.Message)
default:
logger.Info("info.source not defined for the request")
}
diff --git a/pkg/event/events.go b/pkg/event/events.go
index ae6426d232..0854afd1a5 100644
--- a/pkg/event/events.go
+++ b/pkg/event/events.go
@@ -5,6 +5,7 @@ import (
"strings"
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
+ kyvernov2alpha1 "github.com/kyverno/kyverno/api/kyverno/v2alpha1"
engineapi "github.com/kyverno/kyverno/pkg/engine/api"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
@@ -47,6 +48,13 @@ func getPolicyKind(policy kyvernov1.PolicyInterface) string {
return "ClusterPolicy"
}
+func getCleanupPolicyKind(policy kyvernov2alpha1.CleanupPolicyInterface) string {
+ if policy.IsNamespaced() {
+ return "CleanupPolicy"
+ }
+ return "ClusterCleanupPolicy"
+}
+
func NewPolicyAppliedEvent(source Source, engineResponse engineapi.EngineResponse) Info {
resource := engineResponse.Resource
var bldr strings.Builder
@@ -181,6 +189,28 @@ func NewPolicyExceptionEvents(engineResponse engineapi.EngineResponse, ruleResp
return []Info{policyEvent, exceptionEvent}
}
+func NewCleanupPolicyEvent(policy kyvernov2alpha1.CleanupPolicyInterface, resource unstructured.Unstructured, err error) Info {
+ if err == nil {
+ return Info{
+ Kind: getCleanupPolicyKind(policy),
+ Namespace: policy.GetNamespace(),
+ Name: policy.GetName(),
+ Source: CleanupController,
+ Reason: PolicyApplied,
+ Message: fmt.Sprintf("successfully cleaned up the target resource %v/%v/%v", resource.GetKind(), resource.GetNamespace(), resource.GetName()),
+ }
+ } else {
+ return Info{
+ Kind: getCleanupPolicyKind(policy),
+ Namespace: policy.GetNamespace(),
+ Name: policy.GetName(),
+ Source: CleanupController,
+ Reason: PolicyError,
+ Message: fmt.Sprintf("failed to clean up the target resource %v/%v/%v: %v", resource.GetKind(), resource.GetNamespace(), resource.GetName(), err.Error()),
+ }
+ }
+}
+
func NewFailedEvent(err error, policy, rule string, source Source, resource kyvernov1.ResourceSpec) Info {
return Info{
Kind: resource.GetKind(),