1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 02:18:15 +00:00

fix: go routines not gracefully shut down in controllers (#5022)

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

Co-authored-by: Prateek Pandey <prateek.pandey@nirmata.com>
This commit is contained in:
Charles-Edouard Brétéché 2022-10-19 10:54:48 +02:00 committed by GitHub
parent cdfac95cdb
commit c4b3301ab0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 53 additions and 44 deletions

View file

@ -48,7 +48,6 @@ func NewController(secretInformer corev1informers.SecretInformer, certRenewer tl
}
func (c *controller) Run(ctx context.Context, workers int) {
go c.ticker(ctx)
// we need to enqueue our secrets in case they don't exist yet in the cluster
// this way we ensure the reconcile happens (hence renewal/creation)
if err := c.secretEnqueue(&corev1.Secret{
@ -67,7 +66,7 @@ func (c *controller) Run(ctx context.Context, workers int) {
}); err != nil {
logger.Error(err, "failed to enqueue CA secret", "name", tls.GenerateRootCASecretName())
}
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile, c.ticker)
}
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {
@ -80,7 +79,7 @@ func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, nam
return c.renewCertificates()
}
func (c *controller) ticker(ctx context.Context) {
func (c *controller) ticker(ctx context.Context, logger logr.Logger) {
certsRenewalTicker := time.NewTicker(tls.CertRenewalInterval)
defer certsRenewalTicker.Stop()
for {

View file

@ -2,6 +2,7 @@ package config
import (
"context"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/config"
@ -41,7 +42,7 @@ func NewController(configuration config.Configuration, configmapInformer corev1i
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {

View file

@ -4,11 +4,11 @@ import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/controllers"
"github.com/kyverno/kyverno/pkg/logging"
"github.com/kyverno/kyverno/pkg/metrics"
util "github.com/kyverno/kyverno/pkg/utils"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -42,7 +42,6 @@ func NewController(client dclient.Interface, mgr Manager) Controller {
if mgr == nil {
panic(fmt.Errorf("nil manager sent into crd sync"))
}
return &controller{
manager: mgr,
client: client,
@ -51,22 +50,25 @@ func NewController(client dclient.Interface, mgr Manager) Controller {
func (c *controller) Run(ctx context.Context, workers int) {
if err := c.updateInClusterKindToAPIVersions(); err != nil {
logging.Error(err, "failed to update in-cluster api versions")
logger.Error(err, "failed to update in-cluster api versions")
}
newDoc, err := c.client.Discovery().OpenAPISchema()
if err != nil {
logging.Error(err, "cannot get OpenAPI schema")
logger.Error(err, "cannot get OpenAPI schema")
}
err = c.manager.UseOpenAPIDocument(newDoc)
if err != nil {
logging.Error(err, "Could not set custom OpenAPI document")
logger.Error(err, "Could not set custom OpenAPI document")
}
// Sync CRD before kyverno starts
c.sync()
var wg sync.WaitGroup
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, c.CheckSync, 15*time.Second)
wg.Add(1)
go func() {
defer wg.Done()
wait.UntilWithContext(ctx, c.CheckSync, 15*time.Second)
}()
}
<-ctx.Done()
}
@ -81,7 +83,7 @@ func (c *controller) sync() {
c.client.RecordClientQuery(metrics.ClientList, metrics.KubeDynamicClient, "CustomResourceDefinition", "")
if err != nil {
logging.Error(err, "could not fetch crd's from server")
logger.Error(err, "could not fetch crd's from server")
return
}
@ -92,17 +94,17 @@ func (c *controller) sync() {
}
if err := c.updateInClusterKindToAPIVersions(); err != nil {
logging.Error(err, "sync failed, unable to update in-cluster api versions")
logger.Error(err, "sync failed, unable to update in-cluster api versions")
}
newDoc, err := c.client.Discovery().OpenAPISchema()
if err != nil {
logging.Error(err, "cannot get OpenAPI schema")
logger.Error(err, "cannot get OpenAPI schema")
}
err = c.manager.UseOpenAPIDocument(newDoc)
if err != nil {
logging.Error(err, "Could not set custom OpenAPI document")
logger.Error(err, "Could not set custom OpenAPI document")
}
}
@ -141,7 +143,7 @@ func (c *controller) CheckSync(ctx context.Context) {
Resource: "customresourcedefinitions",
}).List(ctx, metav1.ListOptions{})
if err != nil {
logging.Error(err, "could not fetch crd's from server")
logger.Error(err, "could not fetch crd's from server")
return
}
if len(c.manager.GetCrdList()) != len(crds.Items) {

View file

@ -2,6 +2,7 @@ package policycache
import (
"context"
"time"
"github.com/go-logr/logr"
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
@ -82,7 +83,7 @@ func (c *controller) WarmUp() error {
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) reconcile(ctx context.Context, logger logr.Logger, key, namespace, name string) error {

View file

@ -62,10 +62,6 @@ func NewController(
cadmrEnqueue: controllerutils.AddDefaultEventHandlers(logger, cadmrInformer.Informer(), queue),
metadataCache: metadataCache,
}
return &c
}
func (c *controller) Run(ctx context.Context, workers int) {
c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, _ resource.Resource) {
selector, err := reportutils.SelectorResourceUidEquals(uid)
if err != nil {
@ -75,7 +71,11 @@ func (c *controller) Run(ctx context.Context, workers int) {
logger.Error(err, "failed to enqueue")
}
})
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
return &c
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) enqueue(selector labels.Selector) error {

View file

@ -95,7 +95,7 @@ func NewController(
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) listAdmissionReports(ctx context.Context, namespace string) ([]kyvernov1alpha2.ReportInterface, error) {

View file

@ -3,6 +3,7 @@ package background
import (
"context"
"reflect"
"time"
"github.com/go-logr/logr"
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
@ -85,10 +86,6 @@ func NewController(
}
controllerutils.AddEventHandlers(polInformer.Informer(), c.addPolicy, c.updatePolicy, c.deletePolicy)
controllerutils.AddEventHandlers(cpolInformer.Informer(), c.addPolicy, c.updatePolicy, c.deletePolicy)
return &c
}
func (c *controller) Run(ctx context.Context, workers int) {
c.metadataCache.AddEventHandler(func(uid types.UID, _ schema.GroupVersionKind, resource resource.Resource) {
selector, err := reportutils.SelectorResourceUidEquals(uid)
if err != nil {
@ -103,7 +100,11 @@ func (c *controller) Run(ctx context.Context, workers int) {
c.queue.Add(resource.Namespace + "/" + string(uid))
}
})
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
return &c
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) addPolicy(obj interface{}) {

View file

@ -3,6 +3,7 @@ package resource
import (
"context"
"sync"
"time"
"github.com/go-logr/logr"
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
@ -89,7 +90,7 @@ func NewController(
}
func (c *controller) Run(ctx context.Context, workers int) {
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile)
}
func (c *controller) GetResourceHash(uid types.UID) (Resource, schema.GroupVersionKind, bool) {

View file

@ -200,11 +200,10 @@ func NewController(
func (c *controller) Run(ctx context.Context, workers int) {
// add our known webhooks to the queue
c.enqueueAll()
go c.watchdog(ctx)
controllerutils.Run(ctx, ControllerName, logger, c.queue, workers, maxRetries, c.reconcile)
controllerutils.Run(ctx, logger, ControllerName, time.Second, c.queue, workers, maxRetries, c.reconcile, c.watchdog)
}
func (c *controller) watchdog(ctx context.Context) {
func (c *controller) watchdog(ctx context.Context, logger logr.Logger) {
ticker := time.NewTicker(tickerInterval)
defer ticker.Stop()
for {

View file

@ -15,7 +15,7 @@ import (
type reconcileFunc func(ctx context.Context, logger logr.Logger, key string, namespace string, name string) error
func Run(ctx context.Context, controllerName string, logger logr.Logger, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, cacheSyncs ...cache.InformerSynced) {
func Run(ctx context.Context, logger logr.Logger, controllerName string, period time.Duration, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, routines ...func(context.Context, logr.Logger)) {
logger.Info("starting ...")
defer runtime.HandleCrash()
defer logger.Info("stopped")
@ -24,19 +24,23 @@ func Run(ctx context.Context, controllerName string, logger logr.Logger, queue w
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer queue.ShutDown()
if len(cacheSyncs) > 0 {
if !cache.WaitForNamedCacheSync(controllerName, ctx.Done(), cacheSyncs...) {
return
}
}
for i := 0; i < n; i++ {
wg.Add(1)
go func(logger logr.Logger) {
logger.Info("starting worker")
defer wg.Done()
defer logger.Info("worker stopped")
wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, queue, maxRetries, r) }, time.Second)
}(logger.WithValues("id", i))
wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, queue, maxRetries, r) }, period)
}(logger.WithName("worker").WithValues("id", i))
}
for i, routine := range routines {
wg.Add(1)
go func(logger logr.Logger, routine func(context.Context, logr.Logger)) {
logger.Info("starting routine")
defer wg.Done()
defer logger.Info("routine stopped")
routine(ctx, logger)
}(logger.WithName("routine").WithValues("id", i), routine)
}
<-ctx.Done()
}()

View file

@ -54,13 +54,14 @@ func NewPolicyContextBuilder(
}
func (b *policyContextBuilder) Build(request *admissionv1.AdmissionRequest, policies ...kyvernov1.PolicyInterface) (*engine.PolicyContext, error) {
var err error
userRequestInfo := kyvernov1beta1.RequestInfo{
AdmissionUserInfo: *request.UserInfo.DeepCopy(),
}
userRequestInfo.Roles, userRequestInfo.ClusterRoles, err = userinfo.GetRoleRef(b.rbLister, b.crbLister, request, b.configuration)
if err != nil {
if roles, clusterRoles, err := userinfo.GetRoleRef(b.rbLister, b.crbLister, request, b.configuration); err != nil {
return nil, errors.Wrap(err, "failed to fetch RBAC information for request")
} else {
userRequestInfo.Roles = roles
userRequestInfo.ClusterRoles = clusterRoles
}
ctx, err := newVariablesContext(request, &userRequestInfo)
if err != nil {