1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 10:28:36 +00:00

feat: add reports circuit breaker (#10499)

* feat: add reports circuit breaker

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* improve metrics and granularity

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

---------

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Charles-Edouard Brétéché 2024-06-25 05:16:30 +02:00 committed by GitHub
parent 94d9bbe73f
commit 018d45cb29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 345 additions and 6 deletions

146
cmd/kyverno/breaker.go Normal file
View file

@ -0,0 +1,146 @@
package main
import (
"context"
"errors"
reportsv1 "github.com/kyverno/kyverno/api/reports/v1"
"github.com/kyverno/kyverno/pkg/client/informers/externalversions/internalinterfaces"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
metadataclient "k8s.io/client-go/metadata"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
)
type Counter interface {
Count() int
}
type resourcesCount struct {
store cache.Store
}
func (c *resourcesCount) Count() int {
return len(c.store.List())
}
func StartAdmissionReportsWatcher(ctx context.Context, client metadataclient.Interface) (*resourcesCount, error) {
gvr := reportsv1.SchemeGroupVersion.WithResource("ephemeralreports")
todo := context.TODO()
tweakListOptions := func(lo *metav1.ListOptions) {
lo.LabelSelector = "audit.kyverno.io/source==admission"
}
informer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
tweakListOptions(&options)
return client.Resource(gvr).Namespace(metav1.NamespaceAll).List(todo, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
tweakListOptions(&options)
return client.Resource(gvr).Namespace(metav1.NamespaceAll).Watch(todo, options)
},
},
&metav1.PartialObjectMetadata{},
resyncPeriod,
cache.Indexers{},
)
err := informer.SetTransform(func(in any) (any, error) {
{
in := in.(*metav1.PartialObjectMetadata)
return &metav1.PartialObjectMetadata{
TypeMeta: in.TypeMeta,
ObjectMeta: metav1.ObjectMeta{
Name: in.Name,
GenerateName: in.GenerateName,
Namespace: in.Namespace,
},
}, nil
}
})
if err != nil {
return nil, err
}
go func() {
informer.Run(todo.Done())
}()
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, errors.New("failed to sync cache")
}
return &resourcesCount{
store: informer.GetStore(),
}, nil
}
type counter struct {
count int
}
func (c *counter) Count() int {
return c.count
}
func StartResourceCounter(ctx context.Context, client metadataclient.Interface, gvr schema.GroupVersionResource, tweakListOptions internalinterfaces.TweakListOptionsFunc) (*counter, error) {
objs, err := client.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, err
}
watcher := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.Resource(gvr).Watch(ctx, options)
},
}
watchInterface, err := watchtools.NewRetryWatcher(objs.GetResourceVersion(), watcher)
if err != nil {
return nil, err
}
w := &counter{
count: len(objs.Items),
}
go func() {
for event := range watchInterface.ResultChan() {
switch event.Type {
case watch.Added:
w.count = w.count + 1
case watch.Deleted:
w.count = w.count - 1
}
}
}()
return w, nil
}
func StartAdmissionReportsCounter(ctx context.Context, client metadataclient.Interface) (Counter, error) {
tweakListOptions := func(lo *metav1.ListOptions) {
lo.LabelSelector = "audit.kyverno.io/source==admission"
}
ephrs, err := StartResourceCounter(ctx, client, reportsv1.SchemeGroupVersion.WithResource("ephemeralreports"), tweakListOptions)
if err != nil {
return nil, err
}
cephrs, err := StartResourceCounter(ctx, client, reportsv1.SchemeGroupVersion.WithResource("clusterephemeralreports"), tweakListOptions)
if err != nil {
return nil, err
}
return composite{
inner: []Counter{ephrs, cephrs},
}, nil
}
type composite struct {
inner []Counter
}
func (c composite) Count() int {
sum := 0
for _, counter := range c.inner {
sum += counter.Count()
}
return sum
}

View file

@ -25,6 +25,7 @@ import (
policycachecontroller "github.com/kyverno/kyverno/pkg/controllers/policycache"
vapcontroller "github.com/kyverno/kyverno/pkg/controllers/validatingadmissionpolicy-generate"
webhookcontroller "github.com/kyverno/kyverno/pkg/controllers/webhook"
"github.com/kyverno/kyverno/pkg/d4f"
"github.com/kyverno/kyverno/pkg/engine/apicall"
"github.com/kyverno/kyverno/pkg/event"
"github.com/kyverno/kyverno/pkg/globalcontext/store"
@ -122,7 +123,6 @@ func createrLeaderControllers(
eventGenerator event.Interface,
) ([]internal.Controller, func(context.Context) error, error) {
var leaderControllers []internal.Controller
certManager := certmanager.NewController(
caInformer,
tlsInformer,
@ -251,6 +251,7 @@ func main() {
renewBefore time.Duration
maxAuditWorkers int
maxAuditCapacity int
maxAdmissionReports int
)
flagset := flag.NewFlagSet("kyverno", flag.ExitOnError)
flagset.BoolVar(&dumpPayload, "dumpPayload", false, "Set this flag to activate/deactivate debug mode.")
@ -273,6 +274,7 @@ func main() {
flagset.DurationVar(&renewBefore, "renewBefore", 15*24*time.Hour, "The certificate renewal time before expiration")
flagset.IntVar(&maxAuditWorkers, "maxAuditWorkers", 8, "Maximum number of workers for audit policy processing")
flagset.IntVar(&maxAuditCapacity, "maxAuditCapacity", 1000, "Maximum capacity of the audit policy task queue")
flagset.IntVar(&maxAdmissionReports, "maxAdmissionReports", 10000, "Maximum number of admission reports before we stop creating new ones")
// config
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
@ -515,6 +517,14 @@ func main() {
setup.KyvernoClient,
backgroundServiceAccountName,
)
ephrs, err := StartAdmissionReportsCounter(signalCtx, setup.MetadataClient)
if err != nil {
setup.Logger.Error(errors.New("failed to start admission reports watcher"), "failed to start admission reports watcher")
os.Exit(1)
}
reportsBreaker := d4f.NewBreaker("admission reports", func(context.Context) bool {
return ephrs.Count() > maxAdmissionReports
})
resourceHandlers := webhooksresource.NewHandlers(
engine,
setup.KyvernoDynamicClient,
@ -533,6 +543,7 @@ func main() {
setup.Jp,
maxAuditWorkers,
maxAuditCapacity,
reportsBreaker,
)
exceptionHandlers := webhooksexception.NewHandlers(exception.ValidationOptions{
Enabled: internal.PolicyExceptionEnabled(),

66
pkg/d4f/breaker.go Normal file
View file

@ -0,0 +1,66 @@
package d4f
import (
"context"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/metric"
)
type Breaker interface {
Do(context.Context, func(context.Context) error) error
}
type breaker struct {
name string
drops sdkmetric.Int64Counter
total sdkmetric.Int64Counter
open func(context.Context) bool
}
func NewBreaker(name string, open func(context.Context) bool) *breaker {
logger := logging.WithName("cricuit-breaker")
meter := otel.GetMeterProvider().Meter(metrics.MeterName)
drops, err := meter.Int64Counter(
"kyverno_breaker_drops",
sdkmetric.WithDescription("track the number of times the breaker failed open and dropped"),
)
if err != nil {
logger.Error(err, "Failed to create instrument, kyverno_breaker_drops")
}
total, err := meter.Int64Counter(
"kyverno_breaker_total",
sdkmetric.WithDescription("track number of times the breaker was invoked"),
)
if err != nil {
logger.Error(err, "Failed to create instrument, kyverno_breaker_total")
}
return &breaker{
name: name,
drops: drops,
total: total,
open: open,
}
}
func (b *breaker) Do(ctx context.Context, inner func(context.Context) error) error {
attributes := sdkmetric.WithAttributes(
attribute.String("circuit_name", b.name),
)
if b.total != nil {
b.total.Add(ctx, 1, attributes)
}
if b.open != nil && b.open(ctx) {
if b.drops != nil {
b.drops.Add(ctx, 1, attributes)
}
return nil
}
if inner == nil {
return nil
}
return inner(ctx)
}

77
pkg/d4f/breaker_test.go Normal file
View file

@ -0,0 +1,77 @@
package d4f
import (
"context"
"errors"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_breaker_Do(t *testing.T) {
type args struct {
inner func(context.Context) error
}
tests := []struct {
name string
subject *breaker
args args
wantErr bool
}{{
name: "empty",
subject: NewBreaker("", nil),
wantErr: false,
}, {
name: "no error",
subject: NewBreaker("", nil),
args: args{
inner: func(context.Context) error {
return nil
},
},
wantErr: false,
}, {
name: "with error",
subject: NewBreaker("", nil),
args: args{
inner: func(context.Context) error {
return errors.New("foo")
},
},
wantErr: true,
}, {
name: "with break",
subject: NewBreaker("", func(context.Context) bool {
return true
}),
args: args{
inner: func(context.Context) error {
return errors.New("foo")
},
},
wantErr: false,
}, {
name: "with metrics",
subject: &breaker{
open: func(context.Context) bool {
return true
},
},
args: args{
inner: func(context.Context) error {
return errors.New("foo")
},
},
wantErr: false,
}}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.subject.Do(context.TODO(), tt.args.inner)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

View file

@ -17,6 +17,7 @@ import (
kyvernov2listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v2"
"github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/d4f"
engineapi "github.com/kyverno/kyverno/pkg/engine/api"
"github.com/kyverno/kyverno/pkg/engine/jmespath"
"github.com/kyverno/kyverno/pkg/engine/policycontext"
@ -63,6 +64,7 @@ type resourceHandlers struct {
admissionReports bool
backgroundServiceAccountName string
auditPool *pond.WorkerPool
reportsBreaker d4f.Breaker
}
func NewHandlers(
@ -83,6 +85,7 @@ func NewHandlers(
jp jmespath.Interface,
maxAuditWorkers int,
maxAuditCapacity int,
reportsBreaker d4f.Breaker,
) webhooks.ResourceHandlers {
return &resourceHandlers{
engine: engine,
@ -101,6 +104,7 @@ func NewHandlers(
admissionReports: admissionReports,
backgroundServiceAccountName: backgroundServiceAccountName,
auditPool: pond.New(maxAuditWorkers, maxAuditCapacity, pond.Strategy(pond.Lazy())),
reportsBreaker: reportsBreaker,
}
}
@ -120,7 +124,19 @@ func (h *resourceHandlers) Validate(ctx context.Context, logger logr.Logger, req
logger.V(4).Info("processing policies for validate admission request", "validate", len(policies), "mutate", len(mutatePolicies), "generate", len(generatePolicies))
vh := validation.NewValidationHandler(logger, h.kyvernoClient, h.engine, h.pCache, h.pcBuilder, h.eventGen, h.admissionReports, h.metricsConfig, h.configuration, h.nsLister)
vh := validation.NewValidationHandler(
logger,
h.kyvernoClient,
h.engine,
h.pCache,
h.pcBuilder,
h.eventGen,
h.admissionReports,
h.metricsConfig,
h.configuration,
h.nsLister,
h.reportsBreaker,
)
var wg sync.WaitGroup
var ok bool
var msg string
@ -182,7 +198,16 @@ func (h *resourceHandlers) Mutate(ctx context.Context, logger logr.Logger, reque
logger.Error(err, "failed to build policy context")
return admissionutils.Response(request.UID, err)
}
ivh := imageverification.NewImageVerificationHandler(logger, h.kyvernoClient, h.engine, h.eventGen, h.admissionReports, h.configuration, h.nsLister)
ivh := imageverification.NewImageVerificationHandler(
logger,
h.kyvernoClient,
h.engine,
h.eventGen,
h.admissionReports,
h.configuration,
h.nsLister,
h.reportsBreaker,
)
imagePatches, imageVerifyWarnings, err := ivh.Handle(ctx, newRequest, verifyImagesPolicies, policyContext)
if err != nil {
logger.Error(err, "image verification failed")

View file

@ -9,6 +9,7 @@ import (
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/d4f"
"github.com/kyverno/kyverno/pkg/engine"
engineapi "github.com/kyverno/kyverno/pkg/engine/api"
"github.com/kyverno/kyverno/pkg/engine/mutate/patch"
@ -39,6 +40,7 @@ type imageVerificationHandler struct {
admissionReports bool
cfg config.Configuration
nsLister corev1listers.NamespaceLister
reportsBreaker d4f.Breaker
}
func NewImageVerificationHandler(
@ -49,6 +51,7 @@ func NewImageVerificationHandler(
admissionReports bool,
cfg config.Configuration,
nsLister corev1listers.NamespaceLister,
reportsBreaker d4f.Breaker,
) ImageVerificationHandler {
return &imageVerificationHandler{
kyvernoClient: kyvernoClient,
@ -58,6 +61,7 @@ func NewImageVerificationHandler(
admissionReports: admissionReports,
cfg: cfg,
nsLister: nsLister,
reportsBreaker: reportsBreaker,
}
}
@ -152,7 +156,7 @@ func (v *imageVerificationHandler) handleAudit(
ctx context.Context,
resource unstructured.Unstructured,
request admissionv1.AdmissionRequest,
namespaceLabels map[string]string,
_ map[string]string,
engineResponses ...engineapi.EngineResponse,
) {
createReport := v.admissionReports
@ -175,7 +179,10 @@ func (v *imageVerificationHandler) handleAudit(
if createReport {
report := reportutils.BuildAdmissionReport(resource, request, engineResponses...)
if len(report.GetResults()) > 0 {
_, err := reportutils.CreateReport(context.Background(), report, v.kyvernoClient)
err := v.reportsBreaker.Do(ctx, func(ctx context.Context) error {
_, err := reportutils.CreateReport(context.Background(), report, v.kyvernoClient)
return err
})
if err != nil {
v.log.Error(err, "failed to create report")
}

View file

@ -9,6 +9,7 @@ import (
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/d4f"
engineapi "github.com/kyverno/kyverno/pkg/engine/api"
"github.com/kyverno/kyverno/pkg/engine/policycontext"
"github.com/kyverno/kyverno/pkg/event"
@ -45,6 +46,7 @@ func NewValidationHandler(
metrics metrics.MetricsConfigManager,
cfg config.Configuration,
nsLister corev1listers.NamespaceLister,
reportsBreaker d4f.Breaker,
) ValidationHandler {
return &validationHandler{
log: log,
@ -57,6 +59,7 @@ func NewValidationHandler(
metrics: metrics,
cfg: cfg,
nsLister: nsLister,
reportsBreaker: reportsBreaker,
}
}
@ -71,6 +74,7 @@ type validationHandler struct {
metrics metrics.MetricsConfigManager
cfg config.Configuration
nsLister corev1listers.NamespaceLister
reportsBreaker d4f.Breaker
}
func (v *validationHandler) HandleValidationEnforce(
@ -225,7 +229,10 @@ func (v *validationHandler) createReports(
) error {
report := reportutils.BuildAdmissionReport(resource, request.AdmissionRequest, engineResponses...)
if len(report.GetResults()) > 0 {
_, err := reportutils.CreateReport(ctx, report, v.kyvernoClient)
err := v.reportsBreaker.Do(ctx, func(ctx context.Context) error {
_, err := reportutils.CreateReport(ctx, report, v.kyvernoClient)
return err
})
if err != nil {
return err
}