diff --git a/cmd/kyverno/breaker.go b/cmd/kyverno/breaker.go index 83ee6e5ec4..d85cc1f5bc 100644 --- a/cmd/kyverno/breaker.go +++ b/cmd/kyverno/breaker.go @@ -2,85 +2,28 @@ package main import ( "context" - "errors" reportsv1 "github.com/kyverno/kyverno/api/reports/v1" + watchtools "github.com/kyverno/kyverno/cmd/kyverno/watch" "github.com/kyverno/kyverno/pkg/client/informers/externalversions/internalinterfaces" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" metadataclient "k8s.io/client-go/metadata" "k8s.io/client-go/tools/cache" - watchtools "k8s.io/client-go/tools/watch" ) type Counter interface { - Count() int -} - -type resourcesCount struct { - store cache.Store -} - -func (c *resourcesCount) Count() int { - return len(c.store.List()) -} - -func StartAdmissionReportsWatcher(ctx context.Context, client metadataclient.Interface) (*resourcesCount, error) { - gvr := reportsv1.SchemeGroupVersion.WithResource("ephemeralreports") - todo := context.TODO() - tweakListOptions := func(lo *metav1.ListOptions) { - lo.LabelSelector = "audit.kyverno.io/source==admission" - } - informer := cache.NewSharedIndexInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - tweakListOptions(&options) - return client.Resource(gvr).Namespace(metav1.NamespaceAll).List(todo, options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - tweakListOptions(&options) - return client.Resource(gvr).Namespace(metav1.NamespaceAll).Watch(todo, options) - }, - }, - &metav1.PartialObjectMetadata{}, - resyncPeriod, - cache.Indexers{}, - ) - err := informer.SetTransform(func(in any) (any, error) { - { - in := in.(*metav1.PartialObjectMetadata) - return &metav1.PartialObjectMetadata{ - TypeMeta: in.TypeMeta, - ObjectMeta: metav1.ObjectMeta{ - Name: in.Name, - GenerateName: in.GenerateName, - Namespace: in.Namespace, - }, - }, nil - } - }) - if err != nil { - return nil, err - } - go func() { - informer.Run(todo.Done()) - }() - if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) { - return nil, errors.New("failed to sync cache") - } - return &resourcesCount{ - store: informer.GetStore(), - }, nil + Count() (int, bool) } type counter struct { - count int + count int + retryWatcher *watchtools.RetryWatcher } -func (c *counter) Count() int { - return c.count +func (c *counter) Count() (int, bool) { + return c.count, c.retryWatcher.IsRunning() } func StartResourceCounter(ctx context.Context, client metadataclient.Interface, gvr schema.GroupVersionResource, tweakListOptions internalinterfaces.TweakListOptionsFunc) (*counter, error) { @@ -101,7 +44,8 @@ func StartResourceCounter(ctx context.Context, client metadataclient.Interface, return nil, err } w := &counter{ - count: len(objs.Items), + count: len(objs.Items), + retryWatcher: watchInterface, } go func() { for event := range watchInterface.ResultChan() { @@ -137,10 +81,14 @@ type composite struct { inner []Counter } -func (c composite) Count() int { +func (c composite) Count() (int, bool) { sum := 0 for _, counter := range c.inner { - sum += counter.Count() + count, isRunning := counter.Count() + if !isRunning { + return 0, false + } + sum += count } - return sum + return sum, true } diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index e593cd03ba..926e8911bc 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -523,7 +523,11 @@ func main() { os.Exit(1) } reportsBreaker := d4f.NewBreaker("admission reports", func(context.Context) bool { - return ephrs.Count() > maxAdmissionReports + count, isRunning := ephrs.Count() + if !isRunning { + return true + } + return count > maxAdmissionReports }) resourceHandlers := webhooksresource.NewHandlers( engine, diff --git a/cmd/kyverno/watch/watcher.go b/cmd/kyverno/watch/watcher.go new file mode 100644 index 0000000000..d8911cff43 --- /dev/null +++ b/cmd/kyverno/watch/watcher.go @@ -0,0 +1,291 @@ +package watch + +import ( + "context" + "errors" + "fmt" + "io" + "net/http" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/dump" + "k8s.io/apimachinery/pkg/util/net" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" +) + +// resourceVersionGetter is an interface used to get resource version from events. +// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method +type resourceVersionGetter interface { + GetResourceVersion() string +} + +// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout) +// it will get restarted from the last point without the consumer even knowing about it. +// RetryWatcher does that by inspecting events and keeping track of resourceVersion. +// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes. +// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to +// use Informers for that. +type RetryWatcher struct { + lastResourceVersion string + watcherClient cache.Watcher + resultChan chan watch.Event + stopChan chan struct{} + doneChan chan struct{} + minRestartDelay time.Duration + isRunning bool +} + +// NewRetryWatcher creates a new RetryWatcher. +// It will make sure that watches gets restarted in case of recoverable errors. +// The initialResourceVersion will be given to watch method when first called. +func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) { + return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second) +} + +func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) { + switch initialResourceVersion { + case "", "0": + // TODO: revisit this if we ever get WATCH v2 where it means start "now" + // without doing the synthetic list of objects at the beginning (see #74022) + return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion) + default: + break + } + + rw := &RetryWatcher{ + lastResourceVersion: initialResourceVersion, + watcherClient: watcherClient, + stopChan: make(chan struct{}), + doneChan: make(chan struct{}), + resultChan: make(chan watch.Event), + minRestartDelay: minRestartDelay, + isRunning: false, + } + + go rw.receive() + return rw, nil +} + +func (rw *RetryWatcher) send(event watch.Event) bool { + // Writing to an unbuffered channel is blocking operation + // and we need to check if stop wasn't requested while doing so. + select { + case rw.resultChan <- event: + return true + case <-rw.stopChan: + return false + } +} + +// doReceive returns true when it is done, false otherwise. +// If it is not done the second return value holds the time to wait before calling it again. +func (rw *RetryWatcher) doReceive() (bool, time.Duration) { + watcher, err := rw.watcherClient.Watch(metav1.ListOptions{ + ResourceVersion: rw.lastResourceVersion, + AllowWatchBookmarks: true, + }) + // We are very unlikely to hit EOF here since we are just establishing the call, + // but it may happen that the apiserver is just shutting down (e.g. being restarted) + // This is consistent with how it is handled for informers + switch err { + case nil: + break + + case io.EOF: + // watch closed normally + return false, 0 + + case io.ErrUnexpectedEOF: + klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err) + return false, 0 + + default: + msg := "Watch failed" + if net.IsProbableEOF(err) || net.IsTimeout(err) { + klog.V(5).InfoS(msg, "err", err) + // Retry + return false, 0 + } + + klog.ErrorS(err, msg) + // Retry + return false, 0 + } + + if watcher == nil { + klog.ErrorS(nil, "Watch returned nil watcher") + // Retry + return false, 0 + } + + ch := watcher.ResultChan() + defer watcher.Stop() + + rw.isRunning = true + defer func() { + rw.isRunning = false + }() + + for { + select { + case <-rw.stopChan: + klog.V(4).InfoS("Stopping RetryWatcher.") + return true, 0 + case event, ok := <-ch: + if !ok { + klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion) + return false, 0 + } + + // We need to inspect the event and get ResourceVersion out of it + switch event.Type { + case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark: + metaObject, ok := event.Object.(resourceVersionGetter) + if !ok { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + resourceVersion := metaObject.GetResourceVersion() + if resourceVersion == "" { + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus, + }) + // We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + + // All is fine; send the non-bookmark events and update resource version. + if event.Type != watch.Bookmark { + ok = rw.send(event) + if !ok { + return true, 0 + } + } + rw.lastResourceVersion = resourceVersion + + continue + + case watch.Error: + // This round trip allows us to handle unstructured status + errObject := apierrors.FromObject(event.Object) + statusErr, ok := errObject.(*apierrors.StatusError) + if !ok { + klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object))) + // Retry unknown errors + return false, 0 + } + + status := statusErr.ErrStatus + + statusDelay := time.Duration(0) + if status.Details != nil { + statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second + } + + switch status.Code { + case http.StatusGone: + // Never retry RV too old errors + _ = rw.send(event) + return true, 0 + + case http.StatusGatewayTimeout, http.StatusInternalServerError: + // Retry + return false, statusDelay + + default: + // We retry by default. RetryWatcher is meant to proceed unless it is certain + // that it can't. If we are not certain, we proceed with retry and leave it + // up to the user to timeout if needed. + + // Log here so we have a record of hitting the unexpected error + // and we can whitelist some error codes if we missed any that are expected. + klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object))) + + // Retry + return false, statusDelay + } + + default: + klog.Errorf("Failed to recognize Event type %q", event.Type) + _ = rw.send(watch.Event{ + Type: watch.Error, + Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus, + }) + // We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data! + return true, 0 + } + } + } +} + +// receive reads the result from a watcher, restarting it if necessary. +func (rw *RetryWatcher) receive() { + defer close(rw.doneChan) + defer close(rw.resultChan) + + klog.V(4).Info("Starting RetryWatcher.") + defer klog.V(4).Info("Stopping RetryWatcher.") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { + select { + case <-rw.stopChan: + cancel() + return + case <-ctx.Done(): + return + } + }() + + // We use non sliding until so we don't introduce delays on happy path when WATCH call + // timeouts or gets closed and we need to reestablish it while also avoiding hot loops. + wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) { + done, retryAfter := rw.doReceive() + if done { + cancel() + return + } + + timer := time.NewTimer(retryAfter) + select { + case <-ctx.Done(): + timer.Stop() + return + case <-timer.C: + } + + klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion) + }, rw.minRestartDelay) +} + +// ResultChan implements Interface. +func (rw *RetryWatcher) ResultChan() <-chan watch.Event { + return rw.resultChan +} + +// Stop implements Interface. +func (rw *RetryWatcher) Stop() { + close(rw.stopChan) +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) Done() <-chan struct{} { + return rw.doneChan +} + +// Done allows the caller to be notified when Retry watcher stops. +func (rw *RetryWatcher) IsRunning() bool { + return rw.isRunning +}