mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-28 02:18:15 +00:00
feat: enable leader election for the background controller (#6237)
* enable leader election for the background controller Signed-off-by: ShutingZhao <shuting@nirmata.com> * update api docs Signed-off-by: ShutingZhao <shuting@nirmata.com> * fix Signed-off-by: ShutingZhao <shuting@nirmata.com> * fix Signed-off-by: ShutingZhao <shuting@nirmata.com> --------- Signed-off-by: ShutingZhao <shuting@nirmata.com>
This commit is contained in:
parent
8236cc4378
commit
6b3be9ada1
25 changed files with 56 additions and 133 deletions
|
@ -26,6 +26,7 @@ import (
|
|||
// UpdateRequestStatus defines the observed state of UpdateRequest
|
||||
type UpdateRequestStatus struct {
|
||||
// Handler represents the instance ID that handles the UR
|
||||
// Deprecated
|
||||
Handler string `json:"handler,omitempty" yaml:"handler,omitempty"`
|
||||
|
||||
// State represents state of the update request.
|
||||
|
|
|
@ -30489,6 +30489,7 @@ spec:
|
|||
type: array
|
||||
handler:
|
||||
description: Handler represents the instance ID that handles the UR
|
||||
Deprecated
|
||||
type: string
|
||||
message:
|
||||
description: Specifies request status message.
|
||||
|
|
|
@ -63,37 +63,9 @@ func setupCosign(logger logr.Logger, imageSignatureRepository string) {
|
|||
}
|
||||
}
|
||||
|
||||
func createNonLeaderControllers(
|
||||
eng engineapi.Engine,
|
||||
genWorkers int,
|
||||
kubeInformer kubeinformers.SharedInformerFactory,
|
||||
kubeKyvernoInformer kubeinformers.SharedInformerFactory,
|
||||
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
||||
kyvernoClient versioned.Interface,
|
||||
dynamicClient dclient.Interface,
|
||||
rclient registryclient.Client,
|
||||
configuration config.Configuration,
|
||||
eventGenerator event.Interface,
|
||||
informerCacheResolvers engineapi.ConfigmapResolver,
|
||||
) []internal.Controller {
|
||||
updateRequestController := background.NewController(
|
||||
kyvernoClient,
|
||||
dynamicClient,
|
||||
eng,
|
||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||
kyvernoInformer.Kyverno().V1().Policies(),
|
||||
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
||||
kubeInformer.Core().V1().Namespaces(),
|
||||
kubeKyvernoInformer.Core().V1().Pods(),
|
||||
eventGenerator,
|
||||
configuration,
|
||||
informerCacheResolvers,
|
||||
)
|
||||
return []internal.Controller{internal.NewController("updaterequest-controller", updateRequestController, genWorkers)}
|
||||
}
|
||||
|
||||
func createrLeaderControllers(
|
||||
eng engineapi.Engine,
|
||||
genWorkers int,
|
||||
kubeInformer kubeinformers.SharedInformerFactory,
|
||||
kyvernoInformer kyvernoinformer.SharedInformerFactory,
|
||||
kyvernoClient versioned.Interface,
|
||||
|
@ -122,8 +94,22 @@ func createrLeaderControllers(
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
backgroundController := background.NewController(
|
||||
kyvernoClient,
|
||||
dynamicClient,
|
||||
eng,
|
||||
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
|
||||
kyvernoInformer.Kyverno().V1().Policies(),
|
||||
kyvernoInformer.Kyverno().V1beta1().UpdateRequests(),
|
||||
kubeInformer.Core().V1().Namespaces(),
|
||||
eventGenerator,
|
||||
configuration,
|
||||
configMapResolver,
|
||||
)
|
||||
return []internal.Controller{
|
||||
internal.NewController("policy-controller", policyCtrl, 2),
|
||||
internal.NewController("background-controller", backgroundController, genWorkers),
|
||||
}, err
|
||||
}
|
||||
|
||||
|
@ -137,7 +123,7 @@ func main() {
|
|||
leaderElectionRetryPeriod time.Duration
|
||||
)
|
||||
flagset := flag.NewFlagSet("updaterequest-controller", flag.ExitOnError)
|
||||
flagset.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller.")
|
||||
flagset.IntVar(&genWorkers, "genWorkers", 10, "Workers for the background controller.")
|
||||
flagset.StringVar(&imagePullSecrets, "imagePullSecrets", "", "Secret resource names for image registry access credentials.")
|
||||
flagset.StringVar(&imageSignatureRepository, "imageSignatureRepository", "", "Alternate repository for image signatures. Can be overridden per rule via `verifyImages.Repository`.")
|
||||
flagset.BoolVar(&allowInsecureRegistry, "allowInsecureRegistry", false, "Whether to allow insecure connections to registries. Don't use this for anything but testing.")
|
||||
|
@ -159,7 +145,7 @@ func main() {
|
|||
// setup signals
|
||||
// setup maxprocs
|
||||
// setup metrics
|
||||
signalCtx, logger, metricsConfig, sdown := internal.Setup("kyverno-updaterequest-controller")
|
||||
signalCtx, logger, metricsConfig, sdown := internal.Setup("kyverno-background-controller")
|
||||
defer sdown()
|
||||
// create instrumented clients
|
||||
kubeClient := internal.CreateKubernetesClient(logger, kubeclient.WithMetrics(metricsConfig, metrics.KubeClient), kubeclient.WithTracing())
|
||||
|
@ -176,7 +162,6 @@ func main() {
|
|||
// ELSE KYAML IS NOT THREAD SAFE
|
||||
kyamlopenapi.Schema()
|
||||
// informer factories
|
||||
kubeInformer := kubeinformers.NewSharedInformerFactory(kubeClient, resyncPeriod)
|
||||
kubeKyvernoInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, resyncPeriod, kubeinformers.WithNamespace(config.KyvernoNamespace()))
|
||||
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(kyvernoClient, resyncPeriod)
|
||||
cacheInformer, err := resolvers.GetCacheInformerFactory(kubeClient, resyncPeriod)
|
||||
|
@ -232,22 +217,8 @@ func main() {
|
|||
// TODO: do we need exceptions here ?
|
||||
nil,
|
||||
)
|
||||
// create non leader controllers
|
||||
nonLeaderControllers := createNonLeaderControllers(
|
||||
engine,
|
||||
genWorkers,
|
||||
kubeInformer,
|
||||
kubeKyvernoInformer,
|
||||
kyvernoInformer,
|
||||
kyvernoClient,
|
||||
dClient,
|
||||
rclient,
|
||||
configuration,
|
||||
eventGenerator,
|
||||
configMapResolver,
|
||||
)
|
||||
// start informers and wait for cache sync
|
||||
if !internal.StartInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeInformer, kubeKyvernoInformer, cacheInformer) {
|
||||
if !internal.StartInformersAndWaitForCacheSync(signalCtx, kyvernoInformer, kubeKyvernoInformer, cacheInformer) {
|
||||
logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
|
||||
os.Exit(1)
|
||||
}
|
||||
|
@ -256,7 +227,7 @@ func main() {
|
|||
// setup leader election
|
||||
le, err := leaderelection.New(
|
||||
logger.WithName("leader-election"),
|
||||
"kyverno-updaterequest-controller",
|
||||
"kyverno-background-controller",
|
||||
config.KyvernoNamespace(),
|
||||
leaderElectionClient,
|
||||
config.KyvernoPodName(),
|
||||
|
@ -269,6 +240,7 @@ func main() {
|
|||
// create leader controllers
|
||||
leaderControllers, err := createrLeaderControllers(
|
||||
engine,
|
||||
genWorkers,
|
||||
kubeInformer,
|
||||
kyvernoInformer,
|
||||
kyvernoClient,
|
||||
|
@ -302,19 +274,13 @@ func main() {
|
|||
logger.Error(err, "failed to initialize leader election")
|
||||
os.Exit(1)
|
||||
}
|
||||
// start non leader controllers
|
||||
var wg sync.WaitGroup
|
||||
for _, controller := range nonLeaderControllers {
|
||||
controller.Run(signalCtx, logger.WithName("controllers"), &wg)
|
||||
}
|
||||
// start leader election
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-signalCtx.Done():
|
||||
return
|
||||
default:
|
||||
le.Run(signalCtx)
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -370,6 +370,7 @@ spec:
|
|||
type: array
|
||||
handler:
|
||||
description: Handler represents the instance ID that handles the UR
|
||||
Deprecated
|
||||
type: string
|
||||
message:
|
||||
description: Specifies request status message.
|
||||
|
|
|
@ -4677,7 +4677,8 @@ string
|
|||
</em>
|
||||
</td>
|
||||
<td>
|
||||
<p>Handler represents the instance ID that handles the UR</p>
|
||||
<p>Handler represents the instance ID that handles the UR
|
||||
Deprecated</p>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
|
@ -29,7 +29,6 @@ import (
|
|||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
corev1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
|
@ -54,7 +53,6 @@ type controller struct {
|
|||
polLister kyvernov1listers.PolicyLister
|
||||
urLister kyvernov1beta1listers.UpdateRequestNamespaceLister
|
||||
nsLister corev1listers.NamespaceLister
|
||||
podLister corev1listers.PodLister
|
||||
|
||||
informersSynced []cache.InformerSynced
|
||||
|
||||
|
@ -75,7 +73,6 @@ func NewController(
|
|||
polInformer kyvernov1informers.PolicyInformer,
|
||||
urInformer kyvernov1beta1informers.UpdateRequestInformer,
|
||||
namespaceInformer corev1informers.NamespaceInformer,
|
||||
podInformer corev1informers.PodInformer,
|
||||
eventGen event.Interface,
|
||||
dynamicConfig config.Configuration,
|
||||
informerCacheResolvers engineapi.ConfigmapResolver,
|
||||
|
@ -89,8 +86,7 @@ func NewController(
|
|||
polLister: polInformer.Lister(),
|
||||
urLister: urLister,
|
||||
nsLister: namespaceInformer.Lister(),
|
||||
podLister: podInformer.Lister(),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "update-request"),
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "background"),
|
||||
eventGen: eventGen,
|
||||
configuration: dynamicConfig,
|
||||
informerCacheResolvers: informerCacheResolvers,
|
||||
|
@ -109,7 +105,7 @@ func NewController(
|
|||
DeleteFunc: c.deletePolicy,
|
||||
})
|
||||
|
||||
c.informersSynced = []cache.InformerSynced{cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced, podInformer.Informer().HasSynced}
|
||||
c.informersSynced = []cache.InformerSynced{cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced, urInformer.Informer().HasSynced, namespaceInformer.Informer().HasSynced}
|
||||
|
||||
return &c
|
||||
}
|
||||
|
@ -192,18 +188,7 @@ func (c *controller) syncUpdateRequest(key string) error {
|
|||
if ur.Status.State == "" {
|
||||
ur = ur.DeepCopy()
|
||||
ur.Status.State = kyvernov1beta1.Pending
|
||||
_, err := c.kyvernoClient.KyvernoV1beta1().UpdateRequests(config.KyvernoNamespace()).UpdateStatus(context.TODO(), ur, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
// if it was acquired by a pod that is gone, release it
|
||||
if ur.Status.Handler != "" {
|
||||
_, err = c.podLister.Pods(config.KyvernoNamespace()).Get(ur.Status.Handler)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
ur = ur.DeepCopy()
|
||||
ur.Status.Handler = ""
|
||||
_, err = c.kyvernoClient.KyvernoV1beta1().UpdateRequests(config.KyvernoNamespace()).UpdateStatus(context.TODO(), ur, metav1.UpdateOptions{})
|
||||
}
|
||||
if _, err := c.kyvernoClient.KyvernoV1beta1().UpdateRequests(config.KyvernoNamespace()).UpdateStatus(context.TODO(), ur, metav1.UpdateOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -226,28 +211,13 @@ func (c *controller) syncUpdateRequest(key string) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
// if in pending state, try to acquire ur and eventually process it
|
||||
// process pending URs
|
||||
if ur.Status.State == kyvernov1beta1.Pending {
|
||||
ur, ok, err := c.acquireUR(ur)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("failed to mark handler for UR %s: %v", key, err)
|
||||
}
|
||||
if !ok {
|
||||
logger.V(3).Info("another instance is handling the UR", "handler", ur.Status.Handler)
|
||||
return nil
|
||||
}
|
||||
logger.V(3).Info("UR is marked successfully", "ur", ur.GetName(), "resourceVersion", ur.GetResourceVersion())
|
||||
if err := c.processUR(ur); err != nil {
|
||||
return fmt.Errorf("failed to process UR %s: %v", key, err)
|
||||
}
|
||||
}
|
||||
ur, err = c.releaseUR(ur)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmark UR %s: %v", key, err)
|
||||
}
|
||||
|
||||
err = c.cleanUR(ur)
|
||||
return err
|
||||
}
|
||||
|
@ -427,47 +397,6 @@ func (c *controller) processUR(ur *kyvernov1beta1.UpdateRequest) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *controller) acquireUR(ur *kyvernov1beta1.UpdateRequest) (*kyvernov1beta1.UpdateRequest, bool, error) {
|
||||
name := ur.GetName()
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
var err error
|
||||
ur, err = c.urLister.Get(name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ur.Status.Handler != "" {
|
||||
return nil
|
||||
}
|
||||
ur = ur.DeepCopy()
|
||||
ur.Status.Handler = config.KyvernoPodName()
|
||||
ur, err = c.kyvernoClient.KyvernoV1beta1().UpdateRequests(config.KyvernoNamespace()).UpdateStatus(context.TODO(), ur, metav1.UpdateOptions{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to acquire ur", "name", name, "ur", ur)
|
||||
return nil, false, err
|
||||
}
|
||||
return ur, ur.Status.Handler == config.KyvernoPodName(), err
|
||||
}
|
||||
|
||||
func (c *controller) releaseUR(ur *kyvernov1beta1.UpdateRequest) (*kyvernov1beta1.UpdateRequest, error) {
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
var err error
|
||||
ur, err = c.urLister.Get(ur.GetName())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if ur.Status.Handler != config.KyvernoPodName() {
|
||||
return nil
|
||||
}
|
||||
ur = ur.DeepCopy()
|
||||
ur.Status.Handler = ""
|
||||
ur, err = c.kyvernoClient.KyvernoV1beta1().UpdateRequests(config.KyvernoNamespace()).UpdateStatus(context.TODO(), ur, metav1.UpdateOptions{})
|
||||
return err
|
||||
})
|
||||
return ur, err
|
||||
}
|
||||
|
||||
func (c *controller) cleanUR(ur *kyvernov1beta1.UpdateRequest) error {
|
||||
if ur.Spec.Type == kyvernov1beta1.Mutate && ur.Status.State == kyvernov1beta1.Completed {
|
||||
return c.kyvernoClient.KyvernoV1beta1().UpdateRequests(config.KyvernoNamespace()).Delete(context.TODO(), ur.GetName(), metav1.DeleteOptions{})
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
# A command can only run a single command, not a pipeline and not a script. The program called must exist on the system where the test is run.
|
||||
apiVersion: kuttl.dev/v1beta1
|
||||
kind: TestStep
|
||||
commands:
|
||||
- command: sleep 3
|
|
@ -0,0 +1,4 @@
|
|||
apiVersion: kuttl.dev/v1beta1
|
||||
kind: TestStep
|
||||
commands:
|
||||
- command: sleep 3
|
|
@ -0,0 +1,5 @@
|
|||
# A command can only run a single command, not a pipeline and not a script. The program called must exist on the system where the test is run.
|
||||
apiVersion: kuttl.dev/v1beta1
|
||||
kind: TestStep
|
||||
commands:
|
||||
- command: sleep 3
|
|
@ -0,0 +1,5 @@
|
|||
# A command can only run a single command, not a pipeline and not a script. The program called must exist on the system where the test is run.
|
||||
apiVersion: kuttl.dev/v1beta1
|
||||
kind: TestStep
|
||||
commands:
|
||||
- command: sleep 3
|
|
@ -0,0 +1,5 @@
|
|||
# A command can only run a single command, not a pipeline and not a script. The program called must exist on the system where the test is run.
|
||||
apiVersion: kuttl.dev/v1beta1
|
||||
kind: TestStep
|
||||
commands:
|
||||
- command: sleep 3
|
Loading…
Add table
Reference in a new issue