From 018d45cb290744308cd0d05eca437f5d6aac34c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Charles-Edouard=20Br=C3=A9t=C3=A9ch=C3=A9?= Date: Tue, 25 Jun 2024 05:16:30 +0200 Subject: [PATCH] feat: add reports circuit breaker (#10499) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add reports circuit breaker Signed-off-by: Charles-Edouard Brétéché * improve metrics and granularity Signed-off-by: Charles-Edouard Brétéché --------- Signed-off-by: Charles-Edouard Brétéché --- cmd/kyverno/breaker.go | 146 ++++++++++++++++++ cmd/kyverno/main.go | 13 +- pkg/d4f/breaker.go | 66 ++++++++ pkg/d4f/breaker_test.go | 77 +++++++++ pkg/webhooks/resource/handlers.go | 29 +++- .../resource/imageverification/handler.go | 11 +- .../resource/validation/validation.go | 9 +- 7 files changed, 345 insertions(+), 6 deletions(-) create mode 100644 cmd/kyverno/breaker.go create mode 100644 pkg/d4f/breaker.go create mode 100644 pkg/d4f/breaker_test.go diff --git a/cmd/kyverno/breaker.go b/cmd/kyverno/breaker.go new file mode 100644 index 0000000000..83ee6e5ec4 --- /dev/null +++ b/cmd/kyverno/breaker.go @@ -0,0 +1,146 @@ +package main + +import ( + "context" + "errors" + + reportsv1 "github.com/kyverno/kyverno/api/reports/v1" + "github.com/kyverno/kyverno/pkg/client/informers/externalversions/internalinterfaces" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" + metadataclient "k8s.io/client-go/metadata" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" +) + +type Counter interface { + Count() int +} + +type resourcesCount struct { + store cache.Store +} + +func (c *resourcesCount) Count() int { + return len(c.store.List()) +} + +func StartAdmissionReportsWatcher(ctx context.Context, client metadataclient.Interface) (*resourcesCount, error) { + gvr := reportsv1.SchemeGroupVersion.WithResource("ephemeralreports") + todo := context.TODO() + tweakListOptions := func(lo *metav1.ListOptions) { + lo.LabelSelector = "audit.kyverno.io/source==admission" + } + informer := cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + tweakListOptions(&options) + return client.Resource(gvr).Namespace(metav1.NamespaceAll).List(todo, options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + tweakListOptions(&options) + return client.Resource(gvr).Namespace(metav1.NamespaceAll).Watch(todo, options) + }, + }, + &metav1.PartialObjectMetadata{}, + resyncPeriod, + cache.Indexers{}, + ) + err := informer.SetTransform(func(in any) (any, error) { + { + in := in.(*metav1.PartialObjectMetadata) + return &metav1.PartialObjectMetadata{ + TypeMeta: in.TypeMeta, + ObjectMeta: metav1.ObjectMeta{ + Name: in.Name, + GenerateName: in.GenerateName, + Namespace: in.Namespace, + }, + }, nil + } + }) + if err != nil { + return nil, err + } + go func() { + informer.Run(todo.Done()) + }() + if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { + return nil, errors.New("failed to sync cache") + } + return &resourcesCount{ + store: informer.GetStore(), + }, nil +} + +type counter struct { + count int +} + +func (c *counter) Count() int { + return c.count +} + +func StartResourceCounter(ctx context.Context, client metadataclient.Interface, gvr schema.GroupVersionResource, tweakListOptions internalinterfaces.TweakListOptionsFunc) (*counter, error) { + objs, err := client.Resource(gvr).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + watcher := &cache.ListWatch{ + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.Resource(gvr).Watch(ctx, options) + }, + } + watchInterface, err := watchtools.NewRetryWatcher(objs.GetResourceVersion(), watcher) + if err != nil { + return nil, err + } + w := &counter{ + count: len(objs.Items), + } + go func() { + for event := range watchInterface.ResultChan() { + switch event.Type { + case watch.Added: + w.count = w.count + 1 + case watch.Deleted: + w.count = w.count - 1 + } + } + }() + return w, nil +} + +func StartAdmissionReportsCounter(ctx context.Context, client metadataclient.Interface) (Counter, error) { + tweakListOptions := func(lo *metav1.ListOptions) { + lo.LabelSelector = "audit.kyverno.io/source==admission" + } + ephrs, err := StartResourceCounter(ctx, client, reportsv1.SchemeGroupVersion.WithResource("ephemeralreports"), tweakListOptions) + if err != nil { + return nil, err + } + cephrs, err := StartResourceCounter(ctx, client, reportsv1.SchemeGroupVersion.WithResource("clusterephemeralreports"), tweakListOptions) + if err != nil { + return nil, err + } + return composite{ + inner: []Counter{ephrs, cephrs}, + }, nil +} + +type composite struct { + inner []Counter +} + +func (c composite) Count() int { + sum := 0 + for _, counter := range c.inner { + sum += counter.Count() + } + return sum +} diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 1c4831b6f9..e593cd03ba 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -25,6 +25,7 @@ import ( policycachecontroller "github.com/kyverno/kyverno/pkg/controllers/policycache" vapcontroller "github.com/kyverno/kyverno/pkg/controllers/validatingadmissionpolicy-generate" webhookcontroller "github.com/kyverno/kyverno/pkg/controllers/webhook" + "github.com/kyverno/kyverno/pkg/d4f" "github.com/kyverno/kyverno/pkg/engine/apicall" "github.com/kyverno/kyverno/pkg/event" "github.com/kyverno/kyverno/pkg/globalcontext/store" @@ -122,7 +123,6 @@ func createrLeaderControllers( eventGenerator event.Interface, ) ([]internal.Controller, func(context.Context) error, error) { var leaderControllers []internal.Controller - certManager := certmanager.NewController( caInformer, tlsInformer, @@ -251,6 +251,7 @@ func main() { renewBefore time.Duration maxAuditWorkers int maxAuditCapacity int + maxAdmissionReports int ) flagset := flag.NewFlagSet("kyverno", flag.ExitOnError) flagset.BoolVar(&dumpPayload, "dumpPayload", false, "Set this flag to activate/deactivate debug mode.") @@ -273,6 +274,7 @@ func main() { flagset.DurationVar(&renewBefore, "renewBefore", 15*24*time.Hour, "The certificate renewal time before expiration") flagset.IntVar(&maxAuditWorkers, "maxAuditWorkers", 8, "Maximum number of workers for audit policy processing") flagset.IntVar(&maxAuditCapacity, "maxAuditCapacity", 1000, "Maximum capacity of the audit policy task queue") + flagset.IntVar(&maxAdmissionReports, "maxAdmissionReports", 10000, "Maximum number of admission reports before we stop creating new ones") // config appConfig := internal.NewConfiguration( internal.WithProfiling(), @@ -515,6 +517,14 @@ func main() { setup.KyvernoClient, backgroundServiceAccountName, ) + ephrs, err := StartAdmissionReportsCounter(signalCtx, setup.MetadataClient) + if err != nil { + setup.Logger.Error(errors.New("failed to start admission reports watcher"), "failed to start admission reports watcher") + os.Exit(1) + } + reportsBreaker := d4f.NewBreaker("admission reports", func(context.Context) bool { + return ephrs.Count() > maxAdmissionReports + }) resourceHandlers := webhooksresource.NewHandlers( engine, setup.KyvernoDynamicClient, @@ -533,6 +543,7 @@ func main() { setup.Jp, maxAuditWorkers, maxAuditCapacity, + reportsBreaker, ) exceptionHandlers := webhooksexception.NewHandlers(exception.ValidationOptions{ Enabled: internal.PolicyExceptionEnabled(), diff --git a/pkg/d4f/breaker.go b/pkg/d4f/breaker.go new file mode 100644 index 0000000000..7866307d3f --- /dev/null +++ b/pkg/d4f/breaker.go @@ -0,0 +1,66 @@ +package d4f + +import ( + "context" + + "github.com/kyverno/kyverno/pkg/logging" + "github.com/kyverno/kyverno/pkg/metrics" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/metric" +) + +type Breaker interface { + Do(context.Context, func(context.Context) error) error +} + +type breaker struct { + name string + drops sdkmetric.Int64Counter + total sdkmetric.Int64Counter + open func(context.Context) bool +} + +func NewBreaker(name string, open func(context.Context) bool) *breaker { + logger := logging.WithName("cricuit-breaker") + meter := otel.GetMeterProvider().Meter(metrics.MeterName) + drops, err := meter.Int64Counter( + "kyverno_breaker_drops", + sdkmetric.WithDescription("track the number of times the breaker failed open and dropped"), + ) + if err != nil { + logger.Error(err, "Failed to create instrument, kyverno_breaker_drops") + } + total, err := meter.Int64Counter( + "kyverno_breaker_total", + sdkmetric.WithDescription("track number of times the breaker was invoked"), + ) + if err != nil { + logger.Error(err, "Failed to create instrument, kyverno_breaker_total") + } + return &breaker{ + name: name, + drops: drops, + total: total, + open: open, + } +} + +func (b *breaker) Do(ctx context.Context, inner func(context.Context) error) error { + attributes := sdkmetric.WithAttributes( + attribute.String("circuit_name", b.name), + ) + if b.total != nil { + b.total.Add(ctx, 1, attributes) + } + if b.open != nil && b.open(ctx) { + if b.drops != nil { + b.drops.Add(ctx, 1, attributes) + } + return nil + } + if inner == nil { + return nil + } + return inner(ctx) +} diff --git a/pkg/d4f/breaker_test.go b/pkg/d4f/breaker_test.go new file mode 100644 index 0000000000..b1b626d6b0 --- /dev/null +++ b/pkg/d4f/breaker_test.go @@ -0,0 +1,77 @@ +package d4f + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_breaker_Do(t *testing.T) { + type args struct { + inner func(context.Context) error + } + tests := []struct { + name string + subject *breaker + args args + wantErr bool + }{{ + name: "empty", + subject: NewBreaker("", nil), + wantErr: false, + }, { + name: "no error", + subject: NewBreaker("", nil), + args: args{ + inner: func(context.Context) error { + return nil + }, + }, + wantErr: false, + }, { + name: "with error", + subject: NewBreaker("", nil), + args: args{ + inner: func(context.Context) error { + return errors.New("foo") + }, + }, + wantErr: true, + }, { + name: "with break", + subject: NewBreaker("", func(context.Context) bool { + return true + }), + args: args{ + inner: func(context.Context) error { + return errors.New("foo") + }, + }, + wantErr: false, + }, { + name: "with metrics", + subject: &breaker{ + open: func(context.Context) bool { + return true + }, + }, + args: args{ + inner: func(context.Context) error { + return errors.New("foo") + }, + }, + wantErr: false, + }} + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.subject.Do(context.TODO(), tt.args.inner) + if tt.wantErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} diff --git a/pkg/webhooks/resource/handlers.go b/pkg/webhooks/resource/handlers.go index 30ddf16649..34c3b8a670 100644 --- a/pkg/webhooks/resource/handlers.go +++ b/pkg/webhooks/resource/handlers.go @@ -17,6 +17,7 @@ import ( kyvernov2listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v2" "github.com/kyverno/kyverno/pkg/clients/dclient" "github.com/kyverno/kyverno/pkg/config" + "github.com/kyverno/kyverno/pkg/d4f" engineapi "github.com/kyverno/kyverno/pkg/engine/api" "github.com/kyverno/kyverno/pkg/engine/jmespath" "github.com/kyverno/kyverno/pkg/engine/policycontext" @@ -63,6 +64,7 @@ type resourceHandlers struct { admissionReports bool backgroundServiceAccountName string auditPool *pond.WorkerPool + reportsBreaker d4f.Breaker } func NewHandlers( @@ -83,6 +85,7 @@ func NewHandlers( jp jmespath.Interface, maxAuditWorkers int, maxAuditCapacity int, + reportsBreaker d4f.Breaker, ) webhooks.ResourceHandlers { return &resourceHandlers{ engine: engine, @@ -101,6 +104,7 @@ func NewHandlers( admissionReports: admissionReports, backgroundServiceAccountName: backgroundServiceAccountName, auditPool: pond.New(maxAuditWorkers, maxAuditCapacity, pond.Strategy(pond.Lazy())), + reportsBreaker: reportsBreaker, } } @@ -120,7 +124,19 @@ func (h *resourceHandlers) Validate(ctx context.Context, logger logr.Logger, req logger.V(4).Info("processing policies for validate admission request", "validate", len(policies), "mutate", len(mutatePolicies), "generate", len(generatePolicies)) - vh := validation.NewValidationHandler(logger, h.kyvernoClient, h.engine, h.pCache, h.pcBuilder, h.eventGen, h.admissionReports, h.metricsConfig, h.configuration, h.nsLister) + vh := validation.NewValidationHandler( + logger, + h.kyvernoClient, + h.engine, + h.pCache, + h.pcBuilder, + h.eventGen, + h.admissionReports, + h.metricsConfig, + h.configuration, + h.nsLister, + h.reportsBreaker, + ) var wg sync.WaitGroup var ok bool var msg string @@ -182,7 +198,16 @@ func (h *resourceHandlers) Mutate(ctx context.Context, logger logr.Logger, reque logger.Error(err, "failed to build policy context") return admissionutils.Response(request.UID, err) } - ivh := imageverification.NewImageVerificationHandler(logger, h.kyvernoClient, h.engine, h.eventGen, h.admissionReports, h.configuration, h.nsLister) + ivh := imageverification.NewImageVerificationHandler( + logger, + h.kyvernoClient, + h.engine, + h.eventGen, + h.admissionReports, + h.configuration, + h.nsLister, + h.reportsBreaker, + ) imagePatches, imageVerifyWarnings, err := ivh.Handle(ctx, newRequest, verifyImagesPolicies, policyContext) if err != nil { logger.Error(err, "image verification failed") diff --git a/pkg/webhooks/resource/imageverification/handler.go b/pkg/webhooks/resource/imageverification/handler.go index 6c2d241dfa..1e726159a6 100644 --- a/pkg/webhooks/resource/imageverification/handler.go +++ b/pkg/webhooks/resource/imageverification/handler.go @@ -9,6 +9,7 @@ import ( kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" "github.com/kyverno/kyverno/pkg/client/clientset/versioned" "github.com/kyverno/kyverno/pkg/config" + "github.com/kyverno/kyverno/pkg/d4f" "github.com/kyverno/kyverno/pkg/engine" engineapi "github.com/kyverno/kyverno/pkg/engine/api" "github.com/kyverno/kyverno/pkg/engine/mutate/patch" @@ -39,6 +40,7 @@ type imageVerificationHandler struct { admissionReports bool cfg config.Configuration nsLister corev1listers.NamespaceLister + reportsBreaker d4f.Breaker } func NewImageVerificationHandler( @@ -49,6 +51,7 @@ func NewImageVerificationHandler( admissionReports bool, cfg config.Configuration, nsLister corev1listers.NamespaceLister, + reportsBreaker d4f.Breaker, ) ImageVerificationHandler { return &imageVerificationHandler{ kyvernoClient: kyvernoClient, @@ -58,6 +61,7 @@ func NewImageVerificationHandler( admissionReports: admissionReports, cfg: cfg, nsLister: nsLister, + reportsBreaker: reportsBreaker, } } @@ -152,7 +156,7 @@ func (v *imageVerificationHandler) handleAudit( ctx context.Context, resource unstructured.Unstructured, request admissionv1.AdmissionRequest, - namespaceLabels map[string]string, + _ map[string]string, engineResponses ...engineapi.EngineResponse, ) { createReport := v.admissionReports @@ -175,7 +179,10 @@ func (v *imageVerificationHandler) handleAudit( if createReport { report := reportutils.BuildAdmissionReport(resource, request, engineResponses...) if len(report.GetResults()) > 0 { - _, err := reportutils.CreateReport(context.Background(), report, v.kyvernoClient) + err := v.reportsBreaker.Do(ctx, func(ctx context.Context) error { + _, err := reportutils.CreateReport(context.Background(), report, v.kyvernoClient) + return err + }) if err != nil { v.log.Error(err, "failed to create report") } diff --git a/pkg/webhooks/resource/validation/validation.go b/pkg/webhooks/resource/validation/validation.go index c0912a434d..b98658aa6a 100644 --- a/pkg/webhooks/resource/validation/validation.go +++ b/pkg/webhooks/resource/validation/validation.go @@ -9,6 +9,7 @@ import ( kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1" "github.com/kyverno/kyverno/pkg/client/clientset/versioned" "github.com/kyverno/kyverno/pkg/config" + "github.com/kyverno/kyverno/pkg/d4f" engineapi "github.com/kyverno/kyverno/pkg/engine/api" "github.com/kyverno/kyverno/pkg/engine/policycontext" "github.com/kyverno/kyverno/pkg/event" @@ -45,6 +46,7 @@ func NewValidationHandler( metrics metrics.MetricsConfigManager, cfg config.Configuration, nsLister corev1listers.NamespaceLister, + reportsBreaker d4f.Breaker, ) ValidationHandler { return &validationHandler{ log: log, @@ -57,6 +59,7 @@ func NewValidationHandler( metrics: metrics, cfg: cfg, nsLister: nsLister, + reportsBreaker: reportsBreaker, } } @@ -71,6 +74,7 @@ type validationHandler struct { metrics metrics.MetricsConfigManager cfg config.Configuration nsLister corev1listers.NamespaceLister + reportsBreaker d4f.Breaker } func (v *validationHandler) HandleValidationEnforce( @@ -225,7 +229,10 @@ func (v *validationHandler) createReports( ) error { report := reportutils.BuildAdmissionReport(resource, request.AdmissionRequest, engineResponses...) if len(report.GetResults()) > 0 { - _, err := reportutils.CreateReport(ctx, report, v.kyvernoClient) + err := v.reportsBreaker.Do(ctx, func(ctx context.Context) error { + _, err := reportutils.CreateReport(ctx, report, v.kyvernoClient) + return err + }) if err != nil { return err }