1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

Add reconciling logic for creating cronjobs whenever a new cleanup policy is created (#5385)

* add reconcile logic to create CronJobs

Signed-off-by: Nikhil Sharma <nikhilsharma230303@gmail.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* more fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix lint issues

Signed-off-by: Nikhil Sharma <nikhilsharma230303@gmail.com>

* watch cronjobs in reconciliation

Signed-off-by: Nikhil Sharma <nikhilsharma230303@gmail.com>

* fix

Signed-off-by: Nikhil Sharma <nikhilsharma230303@gmail.com>

Signed-off-by: Nikhil Sharma <nikhilsharma230303@gmail.com>
Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
Co-authored-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Co-authored-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
Nikhil Sharma 2022-11-25 15:29:45 +05:30 committed by GitHub
parent fa88f4a2ff
commit 8547c8ff8c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 405 additions and 23 deletions

View file

@ -14,5 +14,5 @@ type CleanupPolicyInterface interface {
GetStatus() *CleanupPolicyStatus
Validate(sets.String) field.ErrorList
GetKind() string
GetSchedule() string
GetAPIVersion() string
}

View file

@ -31,6 +31,8 @@ import (
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
// +kubebuilder:storageversion
// +kubebuilder:resource:shortName=cleanpol,categories=kyverno;all
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Schedule",type=string,JSONPath=".spec.schedule"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
@ -58,15 +60,6 @@ func (p *CleanupPolicy) GetStatus() *CleanupPolicyStatus {
return &p.Status
}
// GetSchedule returns the schedule from the policy spec
func (p *CleanupPolicy) GetSchedule() string {
return p.Spec.Schedule
}
func (p *CleanupPolicy) GetKind() string {
return p.Kind
}
// Validate implements programmatic validation
func (p *CleanupPolicy) Validate(clusterResources sets.String) (errs field.ErrorList) {
errs = append(errs, kyvernov1.ValidatePolicyName(field.NewPath("metadata").Child("name"), p.Name)...)
@ -74,6 +67,16 @@ func (p *CleanupPolicy) Validate(clusterResources sets.String) (errs field.Error
return errs
}
// GetKind returns the resource kind
func (p *CleanupPolicy) GetKind() string {
return p.Kind
}
// GetAPIVersion returns the resource kind
func (p *CleanupPolicy) GetAPIVersion() string {
return p.APIVersion
}
// +kubebuilder:object:root=true
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
@ -88,6 +91,8 @@ type CleanupPolicyList struct {
// +genclient:nonNamespaced
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:object:root=true
// +kubebuilder:storageversion
// +kubebuilder:resource:scope=Cluster,shortName=ccleanpol,categories=kyverno;all
// +kubebuilder:subresource:status
// +kubebuilder:printcolumn:name="Schedule",type=string,JSONPath=".spec.schedule"
// +kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp"
@ -115,15 +120,16 @@ func (p *ClusterCleanupPolicy) GetStatus() *CleanupPolicyStatus {
return &p.Status
}
// GetSchedule returns the schedule from the policy spec
func (p *ClusterCleanupPolicy) GetSchedule() string {
return p.Spec.Schedule
}
// GetKind returns the resource kind
func (p *ClusterCleanupPolicy) GetKind() string {
return p.Kind
}
// GetAPIVersion returns the resource kind
func (p *ClusterCleanupPolicy) GetAPIVersion() string {
return p.APIVersion
}
// Validate implements programmatic validation
func (p *ClusterCleanupPolicy) Validate(clusterResources sets.String) (errs field.ErrorList) {
errs = append(errs, kyvernov1.ValidatePolicyName(field.NewPath("metadata").Child("name"), p.Name)...)

View file

@ -22,4 +22,15 @@ rules:
- update
- watch
- deletecollection
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- create
- delete
- get
- list
- update
- watch
{{- end }}

View file

@ -516,9 +516,14 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: CleanupPolicy
listKind: CleanupPolicyList
plural: cleanuppolicies
shortNames:
- cleanpol
singular: cleanuppolicy
scope: Namespaced
versions:
@ -1961,11 +1966,16 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: ClusterCleanupPolicy
listKind: ClusterCleanupPolicyList
plural: clustercleanuppolicies
shortNames:
- ccleanpol
singular: clustercleanuppolicy
scope: Namespaced
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.schedule

View file

@ -0,0 +1,33 @@
package main
import (
"context"
"sync"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/controllers/cleanup"
)
type controller struct {
name string
controller cleanup.Controller
workers int
}
func newController(name string, c cleanup.Controller, w int) controller {
return controller{
name: name,
controller: c,
workers: w,
}
}
func (c controller) run(ctx context.Context, logger logr.Logger, wg *sync.WaitGroup) {
wg.Add(1)
go func(logger logr.Logger) {
logger.Info("starting controller", "workers", c.workers)
defer logger.Info("controller stopped")
defer wg.Done()
c.controller.Run(ctx, c.workers)
}(logger.WithValues("name", c.name))
}

View file

@ -1,5 +1,7 @@
package logger
import "github.com/kyverno/kyverno/pkg/logging"
import (
"github.com/kyverno/kyverno/pkg/logging"
)
var Logger = logging.WithName("cleanuppolicywebhooks")

View file

@ -5,14 +5,20 @@ import (
"flag"
"net/http"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/cmd/internal"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions"
"github.com/kyverno/kyverno/pkg/clients/dclient"
dynamicclient "github.com/kyverno/kyverno/pkg/clients/dynamic"
kubeclient "github.com/kyverno/kyverno/pkg/clients/kube"
kyvernoclient "github.com/kyverno/kyverno/pkg/clients/kyverno"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers/cleanup"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/metrics"
corev1 "k8s.io/api/core/v1"
@ -71,6 +77,10 @@ func setupMetrics(logger logr.Logger, kubeClient kubernetes.Interface) (*metrics
return metricsConfig, cancel, nil
}
func setupSignals() (context.Context, context.CancelFunc) {
return signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
}
func main() {
// application flags
flagset := flag.NewFlagSet("application", flag.ExitOnError)
@ -97,9 +107,6 @@ func main() {
defer sdown()
// create raw client
rawClient := internal.CreateKubernetesClient(logger)
// setup signals
signalCtx, signalCancel := internal.SetupSignals(logger)
defer signalCancel()
// setup metrics
metricsConfig, metricsShutdown, err := setupMetrics(logger, rawClient)
if err != nil {
@ -109,6 +116,9 @@ func main() {
if metricsShutdown != nil {
defer metricsShutdown()
}
// setup signals
signalCtx, signalCancel := setupSignals()
defer signalCancel()
// create instrumented clients
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
dynamicClient := internal.CreateDynamicClient(logger, dynamicclient.WithMetrics(metricsConfig, metrics.KyvernoClient), dynamicclient.WithTracing())
@ -117,7 +127,26 @@ func main() {
logger.Error(err, "failed to create dynamic client")
os.Exit(1)
}
clientConfig := internal.CreateClientConfig(logger)
kyvernoClient, err := kyvernoclient.NewForConfig(
clientConfig,
kyvernoclient.WithMetrics(metricsConfig, metrics.KubeClient),
kyvernoclient.WithTracing(),
)
if err != nil {
logger.Error(err, "failed to create kyverno client")
os.Exit(1)
}
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod)
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
cleanupController := cleanup.NewController(
kubeClient,
kyvernoInformer.Kyverno().V1alpha1().ClusterCleanupPolicies(),
kyvernoInformer.Kyverno().V1alpha1().CleanupPolicies(),
kubeInformer.Batch().V1().CronJobs(),
)
controller := newController(cleanup.ControllerName, *cleanupController, cleanup.Workers)
policyHandlers := NewHandlers(
dClient,
)
@ -127,6 +156,8 @@ func main() {
if !internal.StartInformersAndWaitForCacheSync(ctx, kubeKyvernoInformer) {
os.Exit(1)
}
var wg sync.WaitGroup
controller.run(signalCtx, logger.WithName("cleanup-controller"), &wg)
server := NewServer(
policyHandlers,
func() ([]byte, []byte, error) {
@ -141,4 +172,5 @@ func main() {
server.Run(ctx.Done())
// wait for termination signal
<-ctx.Done()
wg.Wait()
}

View file

@ -9,9 +9,14 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: CleanupPolicy
listKind: CleanupPolicyList
plural: cleanuppolicies
shortNames:
- cleanpol
singular: cleanuppolicy
scope: Namespaced
versions:

View file

@ -9,11 +9,16 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: ClusterCleanupPolicy
listKind: ClusterCleanupPolicyList
plural: clustercleanuppolicies
shortNames:
- ccleanpol
singular: clustercleanuppolicy
scope: Namespaced
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.schedule

View file

@ -682,9 +682,14 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: CleanupPolicy
listKind: CleanupPolicyList
plural: cleanuppolicies
shortNames:
- cleanpol
singular: cleanuppolicy
scope: Namespaced
versions:
@ -2775,11 +2780,16 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: ClusterCleanupPolicy
listKind: ClusterCleanupPolicyList
plural: clustercleanuppolicies
shortNames:
- ccleanpol
singular: clustercleanuppolicy
scope: Namespaced
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.schedule

View file

@ -678,9 +678,14 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: CleanupPolicy
listKind: CleanupPolicyList
plural: cleanuppolicies
shortNames:
- cleanpol
singular: cleanuppolicy
scope: Namespaced
versions:
@ -2768,11 +2773,16 @@ metadata:
spec:
group: kyverno.io
names:
categories:
- kyverno
- all
kind: ClusterCleanupPolicy
listKind: ClusterCleanupPolicyList
plural: clustercleanuppolicies
shortNames:
- ccleanpol
singular: clustercleanuppolicy
scope: Namespaced
scope: Cluster
versions:
- additionalPrinterColumns:
- jsonPath: .spec.schedule

View file

@ -0,0 +1,199 @@
package cleanup
import (
"context"
"time"
"github.com/go-logr/logr"
kyvernov1alpha1 "github.com/kyverno/kyverno/api/kyverno/v1alpha1"
kyvernov1alpha1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1"
kyvernov1alpha1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha1"
"github.com/kyverno/kyverno/pkg/config"
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchv1informers "k8s.io/client-go/informers/batch/v1"
"k8s.io/client-go/kubernetes"
batchv1listers "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/util/workqueue"
)
type Controller struct {
// clients
client kubernetes.Interface
// listers
cpolLister kyvernov1alpha1listers.ClusterCleanupPolicyLister
polLister kyvernov1alpha1listers.CleanupPolicyLister
cjLister batchv1listers.CronJobLister
// queue
queue workqueue.RateLimitingInterface
}
const (
maxRetries = 10
Workers = 3
ControllerName = "cleanup-controller"
)
func NewController(
client kubernetes.Interface,
cpolInformer kyvernov1alpha1informers.ClusterCleanupPolicyInformer,
polInformer kyvernov1alpha1informers.CleanupPolicyInformer,
cjInformer batchv1informers.CronJobInformer,
) *Controller {
c := &Controller{
client: client,
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
cjLister: cjInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
}
controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue)
controllerutils.AddDefaultEventHandlers(logger, polInformer.Informer(), c.queue)
cpolEnqueue := controllerutils.AddDefaultEventHandlers(logger, cpolInformer.Informer(), c.queue)
polEnqueue := controllerutils.AddDefaultEventHandlers(logger, polInformer.Informer(), c.queue)
controllerutils.AddEventHandlersT(
cjInformer.Informer(),
func(n *batchv1.CronJob) {
if len(n.OwnerReferences) == 1 {
if n.OwnerReferences[0].Kind == "ClusterCleanupPolicy" {
cpol := kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
},
}
err := cpolEnqueue(&cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if n.OwnerReferences[0].Kind == "CleanupPolicy" {
pol := kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
Namespace: n.Namespace,
},
}
err := polEnqueue(&pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
},
func(o *batchv1.CronJob, n *batchv1.CronJob) {
if o.GetResourceVersion() != n.GetResourceVersion() {
for _, owner := range n.OwnerReferences {
if owner.Kind == "ClusterCleanupPolicy" {
cpol := kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: owner.Name,
},
}
err := cpolEnqueue(&cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if owner.Kind == "CleanupPolicy" {
pol := kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: owner.Name,
Namespace: n.Namespace,
},
}
err := polEnqueue(&pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
}
},
func(n *batchv1.CronJob) {
if len(n.OwnerReferences) == 1 {
if n.OwnerReferences[0].Kind == "ClusterCleanupPolicy" {
cpol := kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
},
}
err := cpolEnqueue(&cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if n.OwnerReferences[0].Kind == "CleanupPolicy" {
pol := kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
Namespace: n.Namespace,
},
}
err := polEnqueue(&pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
},
)
return c
}
func (c *Controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, logger.V(3), ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *Controller) getPolicy(namespace, name string) (kyvernov1alpha1.CleanupPolicyInterface, error) {
if namespace == "" {
cpolicy, err := c.cpolLister.Get(name)
if err != nil {
return nil, err
}
return cpolicy, nil
} else {
policy, err := c.polLister.CleanupPolicies(namespace).Get(name)
if err != nil {
return nil, err
}
return policy, nil
}
}
func (c *Controller) getCronjob(namespace, name string) (*batchv1.CronJob, error) {
cj, err := c.cjLister.CronJobs(namespace).Get(name)
if err != nil {
return nil, err
}
return cj, nil
}
func (c *Controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
policy, err := c.getPolicy(namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
logger.Error(err, "unable to get the policy from policy informer")
return err
}
cronjobNs := namespace
if namespace == "" {
cronjobNs = config.KyvernoNamespace()
}
if cronjob, err := c.getCronjob(cronjobNs, string(policy.GetUID())); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
cronjob := getCronJobForTriggerResource(policy)
_, err = c.client.BatchV1().CronJobs(cronjobNs).Create(ctx, cronjob, metav1.CreateOptions{})
return err
} else {
_, err = controllerutils.Update(ctx, cronjob, c.client.BatchV1().CronJobs(cronjobNs), func(cronjob *batchv1.CronJob) error {
cronjob.Spec.Schedule = policy.GetSpec().Schedule
return nil
})
return err
}
}

View file

@ -0,0 +1,5 @@
package cleanup
import "github.com/kyverno/kyverno/pkg/logging"
var logger = logging.WithName(ControllerName)

View file

@ -0,0 +1,54 @@
package cleanup
import (
kyvernov1alpha1 "github.com/kyverno/kyverno/api/kyverno/v1alpha1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func getCronJobForTriggerResource(pol kyvernov1alpha1.CleanupPolicyInterface) *batchv1.CronJob {
// TODO: find a better way to do that, it looks like resources returned by WATCH don't have the GVK
apiVersion := "kyverno.io/v1alpha1"
kind := "CleanupPolicy"
if pol.GetNamespace() == "" {
kind = "ClusterCleanupPolicy"
}
cronjob := &batchv1.CronJob{
ObjectMeta: metav1.ObjectMeta{
Name: string(pol.GetUID()),
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: apiVersion,
Kind: kind,
Name: pol.GetName(),
UID: pol.GetUID(),
},
},
},
Spec: batchv1.CronJobSpec{
Schedule: pol.GetSpec().Schedule,
JobTemplate: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Template: corev1.PodTemplateSpec{
Spec: corev1.PodSpec{
RestartPolicy: corev1.RestartPolicyOnFailure,
Containers: []corev1.Container{
{
Name: "cleanup",
Image: "bitnami/kubectl:latest",
Args: []string{
"/bin/sh",
"-c",
`echo "Hello World"`,
},
},
},
},
},
},
},
},
}
return cronjob
}