1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00
kyverno/pkg/controllers/ttl/manager.go
Ved Ratan 10dacd5292
fix: use controller utils package in ttl controller (#8169)
* included controller-util

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* refactor

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* updated event handlers

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* added registration

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* fix

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* fix

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* event handler refactor

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* lint

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* enhancements

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* util refactor

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* removed comments

Signed-off-by: Ved Ratan <vedratan8@gmail.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix handler

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

---------

Signed-off-by: Ved Ratan <vedratan8@gmail.com>
Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Co-authored-by: shuting <shuting@nirmata.com>
Co-authored-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
2023-09-08 09:12:34 +02:00

238 lines
6.7 KiB
Go

package ttl
import (
"context"
"fmt"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/api/kyverno"
"github.com/kyverno/kyverno/pkg/auth/checker"
"github.com/kyverno/kyverno/pkg/controllers"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/metadata"
"k8s.io/client-go/metadata/metadatainformer"
"k8s.io/client-go/tools/cache"
)
type stopFunc = context.CancelFunc
const (
Workers = 3
ControllerName = "ttl-controller-manager"
)
type manager struct {
metadataClient metadata.Interface
discoveryClient discovery.DiscoveryInterface
checker checker.AuthChecker
resController map[schema.GroupVersionResource]stopFunc
logger logr.Logger
interval time.Duration
lock sync.Mutex
infoMetric metric.Int64ObservableGauge
}
func NewManager(
metadataInterface metadata.Interface,
discoveryInterface discovery.DiscoveryInterface,
checker checker.AuthChecker,
timeInterval time.Duration,
) controllers.Controller {
logger := logging.WithName(ControllerName)
meterProvider := otel.GetMeterProvider()
meter := meterProvider.Meter(metrics.MeterName)
infoMetric, err := meter.Int64ObservableGauge(
"kyverno_ttl_controller_info",
metric.WithDescription("can be used to track individual resource controllers running for ttl based cleanup"),
)
if err != nil {
logger.Error(err, "Failed to create instrument, kyverno_ttl_controller_info")
}
mgr := &manager{
metadataClient: metadataInterface,
discoveryClient: discoveryInterface,
checker: checker,
resController: map[schema.GroupVersionResource]stopFunc{},
logger: logger,
interval: timeInterval,
infoMetric: infoMetric,
}
if infoMetric != nil {
if _, err := meter.RegisterCallback(mgr.report, infoMetric); err != nil {
logger.Error(err, "failed to register callback")
}
}
return mgr
}
func (m *manager) Run(ctx context.Context, worker int) {
defer func() {
// Stop all informers and wait for them to finish
for gvr := range m.resController {
logger := m.logger.WithValues("gvr", gvr)
if err := m.stop(ctx, gvr); err != nil {
logger.Error(err, "failed to stop informer")
}
}
}()
ticker := time.NewTicker(m.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := m.reconcile(ctx, worker); err != nil {
m.logger.Error(err, "reconciliation failed")
return
}
}
}
}
func (m *manager) getDesiredState() (sets.Set[schema.GroupVersionResource], error) {
// Get the list of resources currently present in the cluster
newresources, err := discoverResources(m.logger, m.discoveryClient)
if err != nil {
return nil, err
}
validResources := m.filterPermissionsResource(newresources)
return sets.New(validResources...), nil
}
func (m *manager) getObservedState() (sets.Set[schema.GroupVersionResource], error) {
observedState := sets.New[schema.GroupVersionResource]()
for resource := range m.resController {
observedState.Insert(resource)
}
return observedState, nil
}
func (m *manager) stop(ctx context.Context, gvr schema.GroupVersionResource) error {
logger := m.logger.WithValues("gvr", gvr)
if stopFunc, ok := m.resController[gvr]; ok {
delete(m.resController, gvr)
func() {
defer logger.Info("controller stopped")
logger.Info("stopping controller...")
stopFunc()
}()
}
return nil
}
func (m *manager) start(ctx context.Context, gvr schema.GroupVersionResource, workers int) error {
logger := m.logger.WithValues("gvr", gvr)
indexers := cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
}
options := func(options *metav1.ListOptions) {
options.LabelSelector = kyverno.LabelCleanupTtl
}
informer := metadatainformer.NewFilteredMetadataInformer(m.metadataClient,
gvr,
metav1.NamespaceAll,
10*time.Minute,
indexers,
options,
)
cont, cancel := context.WithCancel(ctx)
var informerWaitGroup wait.Group
informerWaitGroup.StartWithContext(cont, func(ctx context.Context) {
logger.V(3).Info("informer starting...")
defer logger.V(3).Info("informer stopping...")
informer.Informer().Run(cont.Done())
})
stopInformer := func() {
// Send stop signal to informer's goroutine
cancel()
// Wait for the group to terminate
informerWaitGroup.Wait()
}
if !cache.WaitForCacheSync(ctx.Done(), informer.Informer().HasSynced) {
stopInformer()
return fmt.Errorf("failed to wait for cache sync: %s", gvr.Resource)
}
controller, err := newController(m.metadataClient.Resource(gvr), informer, logger, gvr)
if err != nil {
stopInformer()
return err
}
var controllerWaitGroup wait.Group
controllerWaitGroup.StartWithContext(cont, func(ctx context.Context) {
logger.V(3).Info("controller starting...")
defer logger.V(3).Info("controller stopping...")
controller.Start(ctx, workers)
})
m.resController[gvr] = func() {
stopInformer()
controller.Stop()
controllerWaitGroup.Wait()
}
return nil
}
func (m *manager) filterPermissionsResource(resources []schema.GroupVersionResource) []schema.GroupVersionResource {
validResources := []schema.GroupVersionResource{}
for _, resource := range resources {
// Check if the service account has the necessary permissions
if HasResourcePermissions(m.logger, resource, m.checker) {
validResources = append(validResources, resource)
}
}
return validResources
}
func (m *manager) report(ctx context.Context, observer metric.Observer) error {
m.lock.Lock()
defer m.lock.Unlock()
for gvr := range m.resController {
observer.ObserveInt64(
m.infoMetric,
1,
metric.WithAttributes(
attribute.String("resource_group", gvr.Group),
attribute.String("resource_version", gvr.Version),
attribute.String("resource_resource", gvr.Resource),
),
)
}
return nil
}
func (m *manager) reconcile(ctx context.Context, workers int) error {
defer m.logger.V(3).Info("manager reconciliation done")
m.logger.V(3).Info("beginning reconciliation", "interval", m.interval)
desiredState, err := m.getDesiredState()
if err != nil {
return err
}
observedState, err := m.getObservedState()
if err != nil {
return err
}
m.lock.Lock()
defer m.lock.Unlock()
for gvr := range observedState.Difference(desiredState) {
if err := m.stop(ctx, gvr); err != nil {
return err
}
}
for gvr := range desiredState.Difference(observedState) {
if err := m.start(ctx, gvr, workers); err != nil {
return err
}
}
return nil
}