mirror of
https://github.com/kyverno/kyverno.git
synced 2024-12-14 11:57:48 +00:00
fix: detect watcher not running (#10610)
Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com> Co-authored-by: Vishal Choudhary <vishal.choudhary@nirmata.com>
This commit is contained in:
parent
ad6ee93e3b
commit
13fc9881d5
3 changed files with 311 additions and 68 deletions
|
@ -2,85 +2,28 @@ package main
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
reportsv1 "github.com/kyverno/kyverno/api/reports/v1"
|
||||
watchtools "github.com/kyverno/kyverno/cmd/kyverno/watch"
|
||||
"github.com/kyverno/kyverno/pkg/client/informers/externalversions/internalinterfaces"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
metadataclient "k8s.io/client-go/metadata"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
watchtools "k8s.io/client-go/tools/watch"
|
||||
)
|
||||
|
||||
type Counter interface {
|
||||
Count() int
|
||||
}
|
||||
|
||||
type resourcesCount struct {
|
||||
store cache.Store
|
||||
}
|
||||
|
||||
func (c *resourcesCount) Count() int {
|
||||
return len(c.store.List())
|
||||
}
|
||||
|
||||
func StartAdmissionReportsWatcher(ctx context.Context, client metadataclient.Interface) (*resourcesCount, error) {
|
||||
gvr := reportsv1.SchemeGroupVersion.WithResource("ephemeralreports")
|
||||
todo := context.TODO()
|
||||
tweakListOptions := func(lo *metav1.ListOptions) {
|
||||
lo.LabelSelector = "audit.kyverno.io/source==admission"
|
||||
}
|
||||
informer := cache.NewSharedIndexInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
|
||||
tweakListOptions(&options)
|
||||
return client.Resource(gvr).Namespace(metav1.NamespaceAll).List(todo, options)
|
||||
},
|
||||
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
|
||||
tweakListOptions(&options)
|
||||
return client.Resource(gvr).Namespace(metav1.NamespaceAll).Watch(todo, options)
|
||||
},
|
||||
},
|
||||
&metav1.PartialObjectMetadata{},
|
||||
resyncPeriod,
|
||||
cache.Indexers{},
|
||||
)
|
||||
err := informer.SetTransform(func(in any) (any, error) {
|
||||
{
|
||||
in := in.(*metav1.PartialObjectMetadata)
|
||||
return &metav1.PartialObjectMetadata{
|
||||
TypeMeta: in.TypeMeta,
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: in.Name,
|
||||
GenerateName: in.GenerateName,
|
||||
Namespace: in.Namespace,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
informer.Run(todo.Done())
|
||||
}()
|
||||
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
|
||||
return nil, errors.New("failed to sync cache")
|
||||
}
|
||||
return &resourcesCount{
|
||||
store: informer.GetStore(),
|
||||
}, nil
|
||||
Count() (int, bool)
|
||||
}
|
||||
|
||||
type counter struct {
|
||||
count int
|
||||
count int
|
||||
retryWatcher *watchtools.RetryWatcher
|
||||
}
|
||||
|
||||
func (c *counter) Count() int {
|
||||
return c.count
|
||||
func (c *counter) Count() (int, bool) {
|
||||
return c.count, c.retryWatcher.IsRunning()
|
||||
}
|
||||
|
||||
func StartResourceCounter(ctx context.Context, client metadataclient.Interface, gvr schema.GroupVersionResource, tweakListOptions internalinterfaces.TweakListOptionsFunc) (*counter, error) {
|
||||
|
@ -101,7 +44,8 @@ func StartResourceCounter(ctx context.Context, client metadataclient.Interface,
|
|||
return nil, err
|
||||
}
|
||||
w := &counter{
|
||||
count: len(objs.Items),
|
||||
count: len(objs.Items),
|
||||
retryWatcher: watchInterface,
|
||||
}
|
||||
go func() {
|
||||
for event := range watchInterface.ResultChan() {
|
||||
|
@ -137,10 +81,14 @@ type composite struct {
|
|||
inner []Counter
|
||||
}
|
||||
|
||||
func (c composite) Count() int {
|
||||
func (c composite) Count() (int, bool) {
|
||||
sum := 0
|
||||
for _, counter := range c.inner {
|
||||
sum += counter.Count()
|
||||
count, isRunning := counter.Count()
|
||||
if !isRunning {
|
||||
return 0, false
|
||||
}
|
||||
sum += count
|
||||
}
|
||||
return sum
|
||||
return sum, true
|
||||
}
|
||||
|
|
|
@ -523,7 +523,11 @@ func main() {
|
|||
os.Exit(1)
|
||||
}
|
||||
reportsBreaker := d4f.NewBreaker("admission reports", func(context.Context) bool {
|
||||
return ephrs.Count() > maxAdmissionReports
|
||||
count, isRunning := ephrs.Count()
|
||||
if !isRunning {
|
||||
return true
|
||||
}
|
||||
return count > maxAdmissionReports
|
||||
})
|
||||
resourceHandlers := webhooksresource.NewHandlers(
|
||||
engine,
|
||||
|
|
291
cmd/kyverno/watch/watcher.go
Normal file
291
cmd/kyverno/watch/watcher.go
Normal file
|
@ -0,0 +1,291 @@
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/dump"
|
||||
"k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// resourceVersionGetter is an interface used to get resource version from events.
|
||||
// We can't reuse an interface from meta otherwise it would be a cyclic dependency and we need just this one method
|
||||
type resourceVersionGetter interface {
|
||||
GetResourceVersion() string
|
||||
}
|
||||
|
||||
// RetryWatcher will make sure that in case the underlying watcher is closed (e.g. due to API timeout or etcd timeout)
|
||||
// it will get restarted from the last point without the consumer even knowing about it.
|
||||
// RetryWatcher does that by inspecting events and keeping track of resourceVersion.
|
||||
// Especially useful when using watch.UntilWithoutRetry where premature termination is causing issues and flakes.
|
||||
// Please note that this is not resilient to etcd cache not having the resource version anymore - you would need to
|
||||
// use Informers for that.
|
||||
type RetryWatcher struct {
|
||||
lastResourceVersion string
|
||||
watcherClient cache.Watcher
|
||||
resultChan chan watch.Event
|
||||
stopChan chan struct{}
|
||||
doneChan chan struct{}
|
||||
minRestartDelay time.Duration
|
||||
isRunning bool
|
||||
}
|
||||
|
||||
// NewRetryWatcher creates a new RetryWatcher.
|
||||
// It will make sure that watches gets restarted in case of recoverable errors.
|
||||
// The initialResourceVersion will be given to watch method when first called.
|
||||
func NewRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher) (*RetryWatcher, error) {
|
||||
return newRetryWatcher(initialResourceVersion, watcherClient, 1*time.Second)
|
||||
}
|
||||
|
||||
func newRetryWatcher(initialResourceVersion string, watcherClient cache.Watcher, minRestartDelay time.Duration) (*RetryWatcher, error) {
|
||||
switch initialResourceVersion {
|
||||
case "", "0":
|
||||
// TODO: revisit this if we ever get WATCH v2 where it means start "now"
|
||||
// without doing the synthetic list of objects at the beginning (see #74022)
|
||||
return nil, fmt.Errorf("initial RV %q is not supported due to issues with underlying WATCH", initialResourceVersion)
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
||||
rw := &RetryWatcher{
|
||||
lastResourceVersion: initialResourceVersion,
|
||||
watcherClient: watcherClient,
|
||||
stopChan: make(chan struct{}),
|
||||
doneChan: make(chan struct{}),
|
||||
resultChan: make(chan watch.Event),
|
||||
minRestartDelay: minRestartDelay,
|
||||
isRunning: false,
|
||||
}
|
||||
|
||||
go rw.receive()
|
||||
return rw, nil
|
||||
}
|
||||
|
||||
func (rw *RetryWatcher) send(event watch.Event) bool {
|
||||
// Writing to an unbuffered channel is blocking operation
|
||||
// and we need to check if stop wasn't requested while doing so.
|
||||
select {
|
||||
case rw.resultChan <- event:
|
||||
return true
|
||||
case <-rw.stopChan:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// doReceive returns true when it is done, false otherwise.
|
||||
// If it is not done the second return value holds the time to wait before calling it again.
|
||||
func (rw *RetryWatcher) doReceive() (bool, time.Duration) {
|
||||
watcher, err := rw.watcherClient.Watch(metav1.ListOptions{
|
||||
ResourceVersion: rw.lastResourceVersion,
|
||||
AllowWatchBookmarks: true,
|
||||
})
|
||||
// We are very unlikely to hit EOF here since we are just establishing the call,
|
||||
// but it may happen that the apiserver is just shutting down (e.g. being restarted)
|
||||
// This is consistent with how it is handled for informers
|
||||
switch err {
|
||||
case nil:
|
||||
break
|
||||
|
||||
case io.EOF:
|
||||
// watch closed normally
|
||||
return false, 0
|
||||
|
||||
case io.ErrUnexpectedEOF:
|
||||
klog.V(1).InfoS("Watch closed with unexpected EOF", "err", err)
|
||||
return false, 0
|
||||
|
||||
default:
|
||||
msg := "Watch failed"
|
||||
if net.IsProbableEOF(err) || net.IsTimeout(err) {
|
||||
klog.V(5).InfoS(msg, "err", err)
|
||||
// Retry
|
||||
return false, 0
|
||||
}
|
||||
|
||||
klog.ErrorS(err, msg)
|
||||
// Retry
|
||||
return false, 0
|
||||
}
|
||||
|
||||
if watcher == nil {
|
||||
klog.ErrorS(nil, "Watch returned nil watcher")
|
||||
// Retry
|
||||
return false, 0
|
||||
}
|
||||
|
||||
ch := watcher.ResultChan()
|
||||
defer watcher.Stop()
|
||||
|
||||
rw.isRunning = true
|
||||
defer func() {
|
||||
rw.isRunning = false
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-rw.stopChan:
|
||||
klog.V(4).InfoS("Stopping RetryWatcher.")
|
||||
return true, 0
|
||||
case event, ok := <-ch:
|
||||
if !ok {
|
||||
klog.V(4).InfoS("Failed to get event! Re-creating the watcher.", "resourceVersion", rw.lastResourceVersion)
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// We need to inspect the event and get ResourceVersion out of it
|
||||
switch event.Type {
|
||||
case watch.Added, watch.Modified, watch.Deleted, watch.Bookmark:
|
||||
metaObject, ok := event.Object.(resourceVersionGetter)
|
||||
if !ok {
|
||||
_ = rw.send(watch.Event{
|
||||
Type: watch.Error,
|
||||
Object: &apierrors.NewInternalError(errors.New("retryWatcher: doesn't support resourceVersion")).ErrStatus,
|
||||
})
|
||||
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
|
||||
return true, 0
|
||||
}
|
||||
|
||||
resourceVersion := metaObject.GetResourceVersion()
|
||||
if resourceVersion == "" {
|
||||
_ = rw.send(watch.Event{
|
||||
Type: watch.Error,
|
||||
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher: object %#v doesn't support resourceVersion", event.Object)).ErrStatus,
|
||||
})
|
||||
// We have to abort here because this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
|
||||
return true, 0
|
||||
}
|
||||
|
||||
// All is fine; send the non-bookmark events and update resource version.
|
||||
if event.Type != watch.Bookmark {
|
||||
ok = rw.send(event)
|
||||
if !ok {
|
||||
return true, 0
|
||||
}
|
||||
}
|
||||
rw.lastResourceVersion = resourceVersion
|
||||
|
||||
continue
|
||||
|
||||
case watch.Error:
|
||||
// This round trip allows us to handle unstructured status
|
||||
errObject := apierrors.FromObject(event.Object)
|
||||
statusErr, ok := errObject.(*apierrors.StatusError)
|
||||
if !ok {
|
||||
klog.Error(fmt.Sprintf("Received an error which is not *metav1.Status but %s", dump.Pretty(event.Object)))
|
||||
// Retry unknown errors
|
||||
return false, 0
|
||||
}
|
||||
|
||||
status := statusErr.ErrStatus
|
||||
|
||||
statusDelay := time.Duration(0)
|
||||
if status.Details != nil {
|
||||
statusDelay = time.Duration(status.Details.RetryAfterSeconds) * time.Second
|
||||
}
|
||||
|
||||
switch status.Code {
|
||||
case http.StatusGone:
|
||||
// Never retry RV too old errors
|
||||
_ = rw.send(event)
|
||||
return true, 0
|
||||
|
||||
case http.StatusGatewayTimeout, http.StatusInternalServerError:
|
||||
// Retry
|
||||
return false, statusDelay
|
||||
|
||||
default:
|
||||
// We retry by default. RetryWatcher is meant to proceed unless it is certain
|
||||
// that it can't. If we are not certain, we proceed with retry and leave it
|
||||
// up to the user to timeout if needed.
|
||||
|
||||
// Log here so we have a record of hitting the unexpected error
|
||||
// and we can whitelist some error codes if we missed any that are expected.
|
||||
klog.V(5).Info(fmt.Sprintf("Retrying after unexpected error: %s", dump.Pretty(event.Object)))
|
||||
|
||||
// Retry
|
||||
return false, statusDelay
|
||||
}
|
||||
|
||||
default:
|
||||
klog.Errorf("Failed to recognize Event type %q", event.Type)
|
||||
_ = rw.send(watch.Event{
|
||||
Type: watch.Error,
|
||||
Object: &apierrors.NewInternalError(fmt.Errorf("retryWatcher failed to recognize Event type %q", event.Type)).ErrStatus,
|
||||
})
|
||||
// We are unable to restart the watch and have to stop the loop or this might cause lastResourceVersion inconsistency by skipping a potential RV with valid data!
|
||||
return true, 0
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// receive reads the result from a watcher, restarting it if necessary.
|
||||
func (rw *RetryWatcher) receive() {
|
||||
defer close(rw.doneChan)
|
||||
defer close(rw.resultChan)
|
||||
|
||||
klog.V(4).Info("Starting RetryWatcher.")
|
||||
defer klog.V(4).Info("Stopping RetryWatcher.")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
select {
|
||||
case <-rw.stopChan:
|
||||
cancel()
|
||||
return
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// We use non sliding until so we don't introduce delays on happy path when WATCH call
|
||||
// timeouts or gets closed and we need to reestablish it while also avoiding hot loops.
|
||||
wait.NonSlidingUntilWithContext(ctx, func(ctx context.Context) {
|
||||
done, retryAfter := rw.doReceive()
|
||||
if done {
|
||||
cancel()
|
||||
return
|
||||
}
|
||||
|
||||
timer := time.NewTimer(retryAfter)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
timer.Stop()
|
||||
return
|
||||
case <-timer.C:
|
||||
}
|
||||
|
||||
klog.V(4).Infof("Restarting RetryWatcher at RV=%q", rw.lastResourceVersion)
|
||||
}, rw.minRestartDelay)
|
||||
}
|
||||
|
||||
// ResultChan implements Interface.
|
||||
func (rw *RetryWatcher) ResultChan() <-chan watch.Event {
|
||||
return rw.resultChan
|
||||
}
|
||||
|
||||
// Stop implements Interface.
|
||||
func (rw *RetryWatcher) Stop() {
|
||||
close(rw.stopChan)
|
||||
}
|
||||
|
||||
// Done allows the caller to be notified when Retry watcher stops.
|
||||
func (rw *RetryWatcher) Done() <-chan struct{} {
|
||||
return rw.doneChan
|
||||
}
|
||||
|
||||
// Done allows the caller to be notified when Retry watcher stops.
|
||||
func (rw *RetryWatcher) IsRunning() bool {
|
||||
return rw.isRunning
|
||||
}
|
Loading…
Reference in a new issue