1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

Fix issue pod should not be ready until the policy cache loaded (#3646)

* fix issue pod should not be ready until the policy cache  loaded.

* remove unused code

* remove testcase

* add test case

* fix issue

* add lister

* fix lift issue

* address comment
This commit is contained in:
Vyankatesh Kudtarkar 2022-04-26 11:56:46 +05:30 committed by GitHub
parent 123a4f5128
commit ae75b97cb7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 54 additions and 16 deletions

View file

@ -181,7 +181,7 @@ func MutatePolicy(policy v1.PolicyInterface, logger logr.Logger) (v1.PolicyInter
if err != nil {
return nil, sanitizederror.NewWithError(fmt.Sprintf("failed to decode patch for %s policy", policy.GetName()), err)
}
policyBytes, _ := json.Marshal(policy)
policyBytes, err := json.Marshal(policy)
if err != nil {
return nil, sanitizederror.NewWithError(fmt.Sprintf("failed to marshal %s policy", policy.GetName()), err)
}

View file

@ -523,7 +523,6 @@ func main() {
go reportReqGen.Run(2, stopCh)
go configData.Run(stopCh)
go eventGenerator.Run(3, stopCh)
go pCacheController.Run(1, stopCh)
go auditHandler.Run(10, stopCh)
if !debug {
go webhookMonitor.Run(webhookCfg, certRenewer, eventGenerator, stopCh)
@ -536,6 +535,7 @@ func main() {
kubeInformer.Start(stopCh)
kubeKyvernoInformer.Start(stopCh)
kubedynamicInformer.Start(stopCh)
pCacheController.CheckPolicySync(stopCh)
// verifies if the admission control is enabled and active
server.RunAsync(stopCh)

View file

@ -1,11 +1,16 @@
package policycache
import (
"os"
"reflect"
"sync/atomic"
"github.com/go-logr/logr"
kyverno "github.com/kyverno/kyverno/api/kyverno/v1"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
@ -13,19 +18,23 @@ import (
// it embeds a policy informer to handle policy events.
// The cache is synced when a policy is add/update/delete.
// This cache is only used in the admission webhook to fast retrieve
// policies based on types (Mutate/ValidateEnforce/Generate).
// policies based on types (Mutate/ValidateEnforce/Generate/imageVerify).
type Controller struct {
pSynched cache.InformerSynced
nspSynched cache.InformerSynced
Cache Interface
log logr.Logger
cpolLister kyvernolister.ClusterPolicyLister
polLister kyvernolister.PolicyLister
pCounter int64
}
// NewPolicyCacheController create a new PolicyController
func NewPolicyCacheController(
pInformer kyvernoinformer.ClusterPolicyInformer,
nspInformer kyvernoinformer.PolicyInformer,
log logr.Logger) *Controller {
log logr.Logger,
) *Controller {
pc := Controller{
Cache: newPolicyCache(log, pInformer.Lister(), nspInformer.Lister()),
@ -48,6 +57,9 @@ func NewPolicyCacheController(
pc.pSynched = pInformer.Informer().HasSynced
pc.nspSynched = nspInformer.Informer().HasSynced
pc.cpolLister = pInformer.Lister()
pc.polLister = nspInformer.Lister()
pc.pCounter = -1
return &pc
}
@ -93,16 +105,47 @@ func (c *Controller) deleteNsPolicy(obj interface{}) {
c.Cache.remove(p)
}
// Run waits until policy informer to be synced
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
// CheckPolicySync wait until the internal policy cache is fully loaded
func (c *Controller) CheckPolicySync(stopCh <-chan struct{}) {
logger := c.log
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, c.pSynched) {
logger.Info("failed to sync informer cache")
return
if !cache.WaitForCacheSync(stopCh, c.pSynched, c.nspSynched) {
logger.Error(nil, "Failed to sync informer cache")
os.Exit(1)
}
<-stopCh
policies := []kyverno.PolicyInterface{}
polList, err := c.polLister.Policies(metav1.NamespaceAll).List(labels.Everything())
if err != nil {
logger.Error(err, "failed to list Policy")
os.Exit(1)
}
for _, p := range polList {
policies = append(policies, p)
}
cpolList, err := c.cpolLister.List(labels.Everything())
if err != nil {
logger.Error(err, "failed to list Cluster Policy")
os.Exit(1)
}
for _, p := range cpolList {
policies = append(policies, p)
}
atomic.StoreInt64(&c.pCounter, int64(len(policies)))
for _, policy := range policies {
c.Cache.add(policy)
atomic.AddInt64(&c.pCounter, ^int64(0))
}
if !c.hasPolicySynced() {
logger.Error(nil, "Failed to sync policy with cache")
os.Exit(1)
}
}
// hasPolicySynced check for policy counter zero
func (c *Controller) hasPolicySynced() bool {
return atomic.LoadInt64(&c.pCounter) == 0
}

View file

@ -95,7 +95,6 @@ func (m *pMap) remove(policy kyverno.PolicyInterface) {
m.lock.Lock()
defer m.lock.Unlock()
m.removePolicyFromCache(policy)
}
func (m *pMap) removePolicyFromCache(policy kyverno.PolicyInterface) {
var pName = policy.GetName()

View file

@ -57,9 +57,6 @@ type WebhookServer struct {
// urSynced returns true if the Update Request store has been synced at least once
urSynced cache.InformerSynced
// list/get cluster policy resource
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has synced atleast
pSynced cache.InformerSynced
@ -171,7 +168,6 @@ func NewWebhookServer(
grSynced: grInformer.Informer().HasSynced,
urLister: urInformer.Lister().UpdateRequests(config.KyvernoNamespace),
urSynced: urInformer.Informer().HasSynced,
pLister: pInformer.Lister(),
pSynced: pInformer.Informer().HasSynced,
rbLister: rbInformer.Lister(),
rbSynced: rbInformer.Informer().HasSynced,