1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-10 01:46:55 +00:00
kyverno/pkg/controllers/cleanup/controller.go
Charles-Edouard Brétéché 83b088ecb9
chore: improve cleanup controller (#5509)
* chore: improve cleanup controller

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* chore: improve cleanup controller

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
2022-11-30 16:23:12 +00:00

151 lines
4.7 KiB
Go

package cleanup
import (
"context"
"time"
"github.com/go-logr/logr"
kyvernov1alpha1 "github.com/kyverno/kyverno/api/kyverno/v1alpha1"
kyvernov1alpha1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1"
kyvernov1alpha1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha1"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/controllers"
controllerutils "github.com/kyverno/kyverno/pkg/utils/controller"
batchv1 "k8s.io/api/batch/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
batchv1informers "k8s.io/client-go/informers/batch/v1"
"k8s.io/client-go/kubernetes"
batchv1listers "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/util/workqueue"
)
type controller struct {
// clients
client kubernetes.Interface
// listers
cpolLister kyvernov1alpha1listers.ClusterCleanupPolicyLister
polLister kyvernov1alpha1listers.CleanupPolicyLister
cjLister batchv1listers.CronJobLister
// queue
queue workqueue.RateLimitingInterface
cpolEnqueue controllerutils.EnqueueFuncT[*kyvernov1alpha1.ClusterCleanupPolicy]
polEnqueue controllerutils.EnqueueFuncT[*kyvernov1alpha1.CleanupPolicy]
}
const (
maxRetries = 10
Workers = 3
ControllerName = "cleanup-controller"
)
func NewController(
client kubernetes.Interface,
cpolInformer kyvernov1alpha1informers.ClusterCleanupPolicyInformer,
polInformer kyvernov1alpha1informers.CleanupPolicyInformer,
cjInformer batchv1informers.CronJobInformer,
) controllers.Controller {
queue := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName)
c := &controller{
client: client,
cpolLister: cpolInformer.Lister(),
polLister: polInformer.Lister(),
cjLister: cjInformer.Lister(),
queue: queue,
cpolEnqueue: controllerutils.AddDefaultEventHandlersT[*kyvernov1alpha1.ClusterCleanupPolicy](logger, cpolInformer.Informer(), queue),
polEnqueue: controllerutils.AddDefaultEventHandlersT[*kyvernov1alpha1.CleanupPolicy](logger, polInformer.Informer(), queue),
}
controllerutils.AddEventHandlersT(
cjInformer.Informer(),
func(n *batchv1.CronJob) { c.enqueueCronJob(n) },
func(o *batchv1.CronJob, n *batchv1.CronJob) { c.enqueueCronJob(o) },
func(n *batchv1.CronJob) { c.enqueueCronJob(n) },
)
return c
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, logger.V(3), ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) enqueueCronJob(n *batchv1.CronJob) {
if len(n.OwnerReferences) == 1 {
if n.OwnerReferences[0].Kind == "ClusterCleanupPolicy" {
cpol := &kyvernov1alpha1.ClusterCleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
},
}
err := c.cpolEnqueue(cpol)
if err != nil {
logger.Error(err, "failed to enqueue ClusterCleanupPolicy object", cpol)
}
} else if n.OwnerReferences[0].Kind == "CleanupPolicy" {
pol := &kyvernov1alpha1.CleanupPolicy{
ObjectMeta: metav1.ObjectMeta{
Name: n.OwnerReferences[0].Name,
Namespace: n.Namespace,
},
}
err := c.polEnqueue(pol)
if err != nil {
logger.Error(err, "failed to enqueue CleanupPolicy object", pol)
}
}
}
}
func (c *controller) getPolicy(namespace, name string) (kyvernov1alpha1.CleanupPolicyInterface, error) {
if namespace == "" {
cpolicy, err := c.cpolLister.Get(name)
if err != nil {
return nil, err
}
return cpolicy, nil
} else {
policy, err := c.polLister.CleanupPolicies(namespace).Get(name)
if err != nil {
return nil, err
}
return policy, nil
}
}
func (c *controller) getCronjob(namespace, name string) (*batchv1.CronJob, error) {
cj, err := c.cjLister.CronJobs(namespace).Get(name)
if err != nil {
return nil, err
}
return cj, nil
}
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
policy, err := c.getPolicy(namespace, name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
logger.Error(err, "unable to get the policy from policy informer")
return err
}
cronjobNs := namespace
if namespace == "" {
cronjobNs = config.KyvernoNamespace()
}
if cronjob, err := c.getCronjob(cronjobNs, string(policy.GetUID())); err != nil {
if !apierrors.IsNotFound(err) {
return err
}
cronjob := getCronJobForTriggerResource(policy)
_, err = c.client.BatchV1().CronJobs(cronjobNs).Create(ctx, cronjob, metav1.CreateOptions{})
return err
} else {
_, err = controllerutils.Update(ctx, cronjob, c.client.BatchV1().CronJobs(cronjobNs), func(cronjob *batchv1.CronJob) error {
cronjob.Spec.Schedule = policy.GetSpec().Schedule
return nil
})
return err
}
}