1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00
kyverno/pkg/policycache/informer.go
Charles-Edouard Brétéché 18af55ed49
refactor: wait for cache sync (#3765)
Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
2022-05-03 01:41:39 +08:00

142 lines
3.7 KiB
Go

package policycache
import (
"os"
"reflect"
"sync/atomic"
"github.com/go-logr/logr"
kyverno "github.com/kyverno/kyverno/api/kyverno/v1"
kyvernoinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
// Controller is responsible for synchronizing Policy Cache,
// it embeds a policy informer to handle policy events.
// The cache is synced when a policy is add/update/delete.
// This cache is only used in the admission webhook to fast retrieve
// policies based on types (Mutate/ValidateEnforce/Generate/imageVerify).
type Controller struct {
Cache Interface
log logr.Logger
cpolLister kyvernolister.ClusterPolicyLister
polLister kyvernolister.PolicyLister
pCounter int64
}
// NewPolicyCacheController create a new PolicyController
func NewPolicyCacheController(
pInformer kyvernoinformer.ClusterPolicyInformer,
nspInformer kyvernoinformer.PolicyInformer,
log logr.Logger,
) *Controller {
pc := Controller{
Cache: newPolicyCache(log, pInformer.Lister(), nspInformer.Lister()),
log: log,
}
// ClusterPolicy Informer
pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
UpdateFunc: pc.updatePolicy,
DeleteFunc: pc.deletePolicy,
})
// Policy Informer
nspInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addNsPolicy,
UpdateFunc: pc.updateNsPolicy,
DeleteFunc: pc.deleteNsPolicy,
})
pc.cpolLister = pInformer.Lister()
pc.polLister = nspInformer.Lister()
pc.pCounter = -1
return &pc
}
func (c *Controller) addPolicy(obj interface{}) {
p := obj.(*kyverno.ClusterPolicy)
c.Cache.add(p)
}
func (c *Controller) updatePolicy(old, cur interface{}) {
pOld := old.(*kyverno.ClusterPolicy)
pNew := cur.(*kyverno.ClusterPolicy)
if reflect.DeepEqual(pOld.Spec, pNew.Spec) {
return
}
c.Cache.update(pOld, pNew)
}
func (c *Controller) deletePolicy(obj interface{}) {
p := obj.(*kyverno.ClusterPolicy)
c.Cache.remove(p)
}
// addNsPolicy - Add Policy to cache
func (c *Controller) addNsPolicy(obj interface{}) {
p := obj.(*kyverno.Policy)
c.Cache.add(p)
}
// updateNsPolicy - Update Policy of cache
func (c *Controller) updateNsPolicy(old, cur interface{}) {
npOld := old.(*kyverno.Policy)
npNew := cur.(*kyverno.Policy)
if reflect.DeepEqual(npOld.Spec, npNew.Spec) {
return
}
c.Cache.update(npOld, npNew)
}
// deleteNsPolicy - Delete Policy from cache
func (c *Controller) deleteNsPolicy(obj interface{}) {
p := obj.(*kyverno.Policy)
c.Cache.remove(p)
}
// CheckPolicySync wait until the internal policy cache is fully loaded
func (c *Controller) CheckPolicySync(stopCh <-chan struct{}) {
logger := c.log
logger.Info("starting")
policies := []kyverno.PolicyInterface{}
polList, err := c.polLister.Policies(metav1.NamespaceAll).List(labels.Everything())
if err != nil {
logger.Error(err, "failed to list Policy")
os.Exit(1)
}
for _, p := range polList {
policies = append(policies, p)
}
cpolList, err := c.cpolLister.List(labels.Everything())
if err != nil {
logger.Error(err, "failed to list Cluster Policy")
os.Exit(1)
}
for _, p := range cpolList {
policies = append(policies, p)
}
atomic.StoreInt64(&c.pCounter, int64(len(policies)))
for _, policy := range policies {
c.Cache.add(policy)
atomic.AddInt64(&c.pCounter, ^int64(0))
}
if !c.hasPolicySynced() {
logger.Error(nil, "Failed to sync policy with cache")
os.Exit(1)
}
}
// hasPolicySynced check for policy counter zero
func (c *Controller) hasPolicySynced() bool {
return atomic.LoadInt64(&c.pCounter) == 0
}