diff --git a/cmd/background-controller/main.go b/cmd/background-controller/main.go index e1ed7c5b17..57505082f4 100644 --- a/cmd/background-controller/main.go +++ b/cmd/background-controller/main.go @@ -111,8 +111,9 @@ func main() { flagset.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.") flagset.StringVar(&omitEvents, "omitEvents", "", "Set this flag to a comma sperated list of PolicyViolation, PolicyApplied, PolicyError, PolicySkipped to disable events, e.g. --omitEvents=PolicyApplied,PolicyViolation") flagset.Int64Var(&maxAPICallResponseLength, "maxAPICallResponseLength", 2*1000*1000, "Maximum allowed response size from API Calls. A value of 0 bypasses checks (not recommended).") + flagset.IntVar(&maxBackgroundReports, "maxBackgroundReports", 10000, "Maximum number of ephemeralreports created for the background policies.") flagset.BoolVar(&backgroundReports, "backgroundReports", true, "Enables or disables reports for mutate existing and generate rules.") - flagset.IntVar(&maxBackgroundReports, "maxBackgroundReports", 10000, "Maximum number of background reports before we stop creating new ones") + // config appConfig := internal.NewConfiguration( internal.WithProfiling(), diff --git a/cmd/reports-controller/main.go b/cmd/reports-controller/main.go index 8bd41178a1..5096ef26a3 100644 --- a/cmd/reports-controller/main.go +++ b/cmd/reports-controller/main.go @@ -10,6 +10,7 @@ import ( "time" "github.com/kyverno/kyverno/cmd/internal" + "github.com/kyverno/kyverno/pkg/breaker" "github.com/kyverno/kyverno/pkg/client/clientset/versioned" kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions" "github.com/kyverno/kyverno/pkg/clients/dclient" @@ -65,6 +66,7 @@ func createReportControllers( configuration config.Configuration, jp jmespath.Interface, eventGenerator event.Interface, + reportsBreaker breaker.Breaker, ) ([]internal.Controller, func(context.Context) error) { var ctrls []internal.Controller var warmups []func(context.Context) error @@ -124,6 +126,7 @@ func createReportControllers( jp, eventGenerator, policyReports, + reportsBreaker, ) ctrls = append(ctrls, internal.NewController( backgroundscancontroller.ControllerName, @@ -160,6 +163,7 @@ func createrLeaderControllers( jp jmespath.Interface, eventGenerator event.Interface, backgroundScanInterval time.Duration, + reportsBreaker breaker.Breaker, ) ([]internal.Controller, func(context.Context) error, error) { reportControllers, warmup := createReportControllers( eng, @@ -179,6 +183,7 @@ func createrLeaderControllers( configuration, jp, eventGenerator, + reportsBreaker, ) return reportControllers, warmup, nil } @@ -197,6 +202,7 @@ func main() { omitEvents string skipResourceFilters bool maxAPICallResponseLength int64 + maxBackgroundReports int ) flagset := flag.NewFlagSet("reports-controller", flag.ExitOnError) flagset.BoolVar(&backgroundScan, "backgroundScan", true, "Enable or disable background scan.") @@ -211,6 +217,7 @@ func main() { flagset.StringVar(&omitEvents, "omitEvents", "", "Set this flag to a comma separated list of PolicyViolation, PolicyApplied, PolicyError, PolicySkipped to disable events, e.g. --omitEvents=PolicyApplied,PolicyViolation") flagset.BoolVar(&skipResourceFilters, "skipResourceFilters", true, "If true, resource filters wont be considered.") flagset.Int64Var(&maxAPICallResponseLength, "maxAPICallResponseLength", 2*1000*1000, "Maximum allowed response size from API Calls. A value of 0 bypasses checks (not recommended).") + flagset.IntVar(&maxBackgroundReports, "maxBackgroundReports", 10000, "Maximum number of ephemeralreports created for the background policies before we stop creating new ones") // config appConfig := internal.NewConfiguration( internal.WithProfiling(), @@ -309,6 +316,20 @@ func main() { setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") os.Exit(1) } + ephrs, err := breaker.StartBackgroundReportsCounter(ctx, setup.MetadataClient) + if err != nil { + setup.Logger.Error(err, "failed to start background-scan reports watcher") + os.Exit(1) + } + + // create the circuit breaker + reportsBreaker := breaker.NewBreaker("background scan reports", func(context.Context) bool { + count, isRunning := ephrs.Count() + if !isRunning { + return true + } + return count > maxBackgroundReports + }) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), @@ -343,6 +364,7 @@ func main() { setup.Jp, eventGenerator, backgroundScanInterval, + reportsBreaker, ) if err != nil { logger.Error(err, "failed to create leader controllers") diff --git a/pkg/breaker/breaker.go b/pkg/breaker/breaker.go index 580f8e0229..7c2b701411 100644 --- a/pkg/breaker/breaker.go +++ b/pkg/breaker/breaker.go @@ -22,7 +22,7 @@ type breaker struct { } func NewBreaker(name string, open func(context.Context) bool) *breaker { - logger := logging.WithName("cricuit-breaker") + logger := logging.WithName("circuit-breaker") meter := otel.GetMeterProvider().Meter(metrics.MeterName) drops, err := meter.Int64Counter( "kyverno_breaker_drops", diff --git a/pkg/controllers/report/background/controller.go b/pkg/controllers/report/background/controller.go index 1259077bd1..5ea6896a13 100644 --- a/pkg/controllers/report/background/controller.go +++ b/pkg/controllers/report/background/controller.go @@ -10,6 +10,7 @@ import ( kyvernov2 "github.com/kyverno/kyverno/api/kyverno/v2" policyreportv1alpha2 "github.com/kyverno/kyverno/api/policyreport/v1alpha2" reportsv1 "github.com/kyverno/kyverno/api/reports/v1" + "github.com/kyverno/kyverno/pkg/breaker" "github.com/kyverno/kyverno/pkg/client/clientset/versioned" kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1" kyvernov2informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v2" @@ -77,6 +78,7 @@ type controller struct { jp jmespath.Interface eventGen event.Interface policyReports bool + breaker breaker.Breaker } func NewController( @@ -96,6 +98,7 @@ func NewController( jp jmespath.Interface, eventGen event.Interface, policyReports bool, + breaker breaker.Breaker, ) controllers.Controller { ephrInformer := metadataFactory.ForResource(reportsv1.SchemeGroupVersion.WithResource("ephemeralreports")) cephrInformer := metadataFactory.ForResource(reportsv1.SchemeGroupVersion.WithResource("clusterephemeralreports")) @@ -117,6 +120,7 @@ func NewController( jp: jp, eventGen: eventGen, policyReports: policyReports, + breaker: breaker, } if vapInformer != nil { c.vapLister = vapInformer.Lister() @@ -462,7 +466,13 @@ func (c *controller) storeReport(ctx context.Context, observed, desired reportsv if !hasReport && !wantsReport { return nil } else if !hasReport && wantsReport { - _, err = reportutils.CreateReport(ctx, desired, c.kyvernoClient) + err = c.breaker.Do(ctx, func(context.Context) error { + _, err := reportutils.CreateReport(ctx, desired, c.kyvernoClient) + if err != nil { + return err + } + return nil + }) return err } else if hasReport && !wantsReport { if observed.GetNamespace() == "" {