1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-30 19:35:06 +00:00

Revert "wait for cache to sync and cleanup"

This reverts commit 9c3b32b903.
This commit is contained in:
shivkumar dudhani 2019-11-15 15:57:18 -08:00
parent cde9d9d3cd
commit 57e8e2a395
8 changed files with 70 additions and 166 deletions

87
main.go
View file

@ -74,11 +74,7 @@ func main() {
}
// WERBHOOK REGISTRATION CLIENT
webhookRegistrationClient, err := webhookconfig.NewWebhookRegistrationClient(
clientConfig,
client,
serverIP,
int32(webhookTimeout))
webhookRegistrationClient, err := webhookconfig.NewWebhookRegistrationClient(clientConfig, client, serverIP, int32(webhookTimeout))
if err != nil {
glog.Fatalf("Unable to register admission webhooks on cluster: %v\n", err)
}
@ -88,58 +84,36 @@ func main() {
// - Policy
// - PolicyVolation
// - cache resync time: 10 seconds
pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(
pclient,
10*time.Second)
pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, 10*time.Second)
// KUBERNETES RESOURCES INFORMER
// watches namespace resource
// - cache resync time: 10 seconds
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(
kubeClient,
10*time.Second)
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 10*time.Second)
// Configuration Data
// dyamically load the configuration from configMap
// - resource filters
// if the configMap is update, the configuration will be updated :D
configData := config.NewConfigData(
kubeClient,
kubeInformer.Core().V1().ConfigMaps(),
filterK8Resources)
configData := config.NewConfigData(kubeClient, kubeInformer.Core().V1().ConfigMaps(), filterK8Resources)
// Policy meta-data store
policyMetaStore := policystore.NewPolicyStore(pInformer.Kyverno().V1().ClusterPolicies())
policyMetaStore := policystore.NewPolicyStore(pInformer.Kyverno().V1().ClusterPolicies().Lister())
// EVENT GENERATOR
// - generate event with retry mechanism
egen := event.NewEventGenerator(
client,
pInformer.Kyverno().V1().ClusterPolicies())
egen := event.NewEventGenerator(client, pInformer.Kyverno().V1().ClusterPolicies())
// POLICY VIOLATION GENERATOR
// -- generate policy violation
pvgen := policyviolation.NewPVGenerator(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().NamespacedPolicyViolations())
pvgen := policyviolation.NewPVGenerator(pclient, client, pInformer.Kyverno().V1().ClusterPolicyViolations().Lister(), pInformer.Kyverno().V1().NamespacedPolicyViolations().Lister())
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
// - status aggregator: recieves stats when a policy is applied
// & updates the policy status
pc, err := policy.NewPolicyController(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().NamespacedPolicyViolations(),
egen,
kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
webhookRegistrationClient,
configData,
pvgen,
policyMetaStore)
pc, err := policy.NewPolicyController(pclient, client, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pInformer.Kyverno().V1().NamespacedPolicyViolations(), egen, kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), webhookRegistrationClient, configData, pvgen, policyMetaStore)
if err != nil {
glog.Fatalf("error creating policy controller: %v\n", err)
}
@ -147,36 +121,19 @@ func main() {
// POLICY VIOLATION CONTROLLER
// policy violation cleanup if the corresponding resource is deleted
// status: lastUpdatTime
pvc, err := policyviolation.NewPolicyViolationController(
client,
pclient,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().ClusterPolicyViolations())
pvc, err := policyviolation.NewPolicyViolationController(client, pclient, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations())
if err != nil {
glog.Fatalf("error creating cluster policy violation controller: %v\n", err)
}
nspvc, err := policyviolation.NewNamespacedPolicyViolationController(
client,
pclient,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().NamespacedPolicyViolations())
nspvc, err := policyviolation.NewNamespacedPolicyViolationController(client, pclient, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().NamespacedPolicyViolations())
if err != nil {
glog.Fatalf("error creating namespaced policy violation controller: %v\n", err)
}
// GENERATE CONTROLLER
// - watches for Namespace resource and generates resource based on the policy generate rule
nsc := namespace.NewNamespaceController(
pclient,
client,
kubeInformer.Core().V1().Namespaces(),
pInformer.Kyverno().V1().ClusterPolicies(),
pc.GetPolicyStatusAggregator(),
egen,
configData,
pvgen,
policyMetaStore)
nsc := namespace.NewNamespaceController(pclient, client, kubeInformer.Core().V1().Namespaces(), pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pc.GetPolicyStatusAggregator(), egen, configData, pvgen, policyMetaStore)
// CONFIGURE CERTIFICATES
tlsPair, err := initTLSPemPair(clientConfig, client)
@ -199,29 +156,17 @@ func main() {
// -- annotations on resources with update details on mutation JSON patches
// -- generate policy violation resource
// -- generate events on policy and resource
server, err := webhooks.NewWebhookServer(
pclient,
client,
tlsPair,
pInformer.Kyverno().V1().ClusterPolicies(),
kubeInformer.Rbac().V1().RoleBindings(),
kubeInformer.Rbac().V1().ClusterRoleBindings(),
egen,
webhookRegistrationClient,
pc.GetPolicyStatusAggregator(),
configData,
policyMetaStore,
pvgen,
cleanUp)
server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1().ClusterPolicies(), pInformer.Kyverno().V1().ClusterPolicyViolations(), pInformer.Kyverno().V1().NamespacedPolicyViolations(),
kubeInformer.Rbac().V1().RoleBindings(), kubeInformer.Rbac().V1().ClusterRoleBindings(), egen, webhookRegistrationClient, pc.GetPolicyStatusAggregator(), configData, policyMetaStore, pvgen, cleanUp)
if err != nil {
glog.Fatalf("Unable to create webhook server: %v\n", err)
}
// Start the components
pInformer.Start(stopCh)
kubeInformer.Start(stopCh)
go configData.Run(stopCh)
go policyMetaStore.Run(stopCh)
if err := configData.Run(stopCh); err != nil {
glog.Fatalf("Unable to load dynamic configuration: %v\n", err)
}
go pc.Run(1, stopCh)
go pvc.Run(1, stopCh)
go nspvc.Run(1, stopCh)

View file

@ -76,12 +76,12 @@ func NewConfigData(rclient kubernetes.Interface, cmInformer informers.ConfigMapI
return &cd
}
//Run checks syncing
func (cd *ConfigData) Run(stopCh <-chan struct{}) {
func (cd *ConfigData) Run(stopCh <-chan struct{}) error {
// wait for cache to populate first time
if !cache.WaitForCacheSync(stopCh, cd.cmListerSycned) {
glog.Error("configuration: failed to sync informer cache")
return fmt.Errorf("Configuration: Failed to sync informer cache")
}
return nil
}
func (cd *ConfigData) addCM(obj interface{}) {

View file

@ -14,18 +14,14 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
//Generator generate events
type Generator struct {
client *client.Client
// list/get cluster policy
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has been synced at least once
pSynced cache.InformerSynced
client *client.Client
pLister kyvernolister.ClusterPolicyLister
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
@ -42,7 +38,6 @@ func NewEventGenerator(client *client.Client, pInformer kyvernoinformer.ClusterP
client: client,
pLister: pInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
pSynced: pInformer.Informer().HasSynced,
recorder: initRecorder(client),
}
@ -91,10 +86,6 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting event generator")
defer glog.Info("Shutting down event generator")
if !cache.WaitForCacheSync(stopCh, gen.pSynced) {
glog.Error("event generator: failed to sync informer cache")
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}

View file

@ -39,12 +39,16 @@ type NamespaceController struct {
//nsLister provides expansion to the namespace lister to inject GVK for the resource
nsLister NamespaceListerExpansion
// nsSynced returns true if the Namespace store has been synced at least once
nsSynced cache.InformerSynced
// nLsister can list/get namespaces from the shared informer's store
// nsLister v1CoreLister.NamespaceLister
// nsListerSynced returns true if the Namespace store has been synced at least once
nsListerSynced cache.InformerSynced
// pvLister can list/get policy violation from the shared informer's store
pLister kyvernolister.ClusterPolicyLister
// pSynced retrns true if the Policy store has been synced at least once
pSynced cache.InformerSynced
// pvListerSynced retrns true if the Policy store has been synced at least once
pvListerSynced cache.InformerSynced
// pvLister can list/get policy violation from the shared informer's store
pvLister kyvernolister.ClusterPolicyViolationLister
// API to send policy stats for aggregation
policyStatus policy.PolicyStatusInterface
// eventGen provides interface to generate evenets
@ -66,6 +70,7 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset,
client *client.Client,
nsInformer v1Informer.NamespaceInformer,
pInformer kyvernoinformer.ClusterPolicyInformer,
pvInformer kyvernoinformer.ClusterPolicyViolationInformer,
policyStatus policy.PolicyStatusInterface,
eventGen event.Interface,
configHandler config.Interface,
@ -98,9 +103,10 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset,
nsc.syncHandler = nsc.syncNamespace
nsc.nsLister = NewNamespaceLister(nsInformer.Lister())
nsc.nsSynced = nsInformer.Informer().HasSynced
nsc.nsListerSynced = nsInformer.Informer().HasSynced
nsc.pLister = pInformer.Lister()
nsc.pSynced = pInformer.Informer().HasSynced
nsc.pvListerSynced = pInformer.Informer().HasSynced
nsc.pvLister = pvInformer.Lister()
nsc.policyStatus = policyStatus
// resource manager
@ -168,8 +174,7 @@ func (nsc *NamespaceController) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting namespace controller")
defer glog.Info("Shutting down namespace controller")
if ok := cache.WaitForCacheSync(stopCh, nsc.nsSynced, nsc.pSynced); !ok {
glog.Error("namespace generator: failed to sync cache")
if ok := cache.WaitForCacheSync(stopCh, nsc.nsListerSynced); !ok {
return
}

View file

@ -395,7 +395,6 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
defer glog.Info("Shutting down policy controller")
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.pvListerSynced, pc.nspvListerSynced) {
glog.Info("failed to sync informer cache")
return
}
for i := 0; i < workers; i++ {

View file

@ -3,11 +3,8 @@ package policystore
import (
"sync"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1"
"k8s.io/client-go/tools/cache"
)
type policyMap map[string]interface{}
@ -16,12 +13,9 @@ type kindMap map[string]namespaceMap
//PolicyStore Store the meta-data information to faster lookup policies
type PolicyStore struct {
data map[string]namespaceMap
mu sync.RWMutex
// list/get cluster policy
data map[string]namespaceMap
mu sync.RWMutex
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has been synced at least once
pSynched cache.InformerSynced
}
//UpdateInterface provides api to update policies
@ -39,22 +33,14 @@ type LookupInterface interface {
}
// NewPolicyStore returns a new policy store
func NewPolicyStore(pInformer kyvernoinformer.ClusterPolicyInformer) *PolicyStore {
func NewPolicyStore(pLister kyvernolister.ClusterPolicyLister) *PolicyStore {
ps := PolicyStore{
data: make(kindMap),
pLister: pInformer.Lister(),
pSynched: pInformer.Informer().HasSynced,
data: make(kindMap),
pLister: pLister,
}
return &ps
}
//Run checks syncing
func (ps *PolicyStore) Run(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, ps.pSynched) {
glog.Error("policy meta store: failed to sync informer cache")
}
}
//Register a new policy
func (ps *PolicyStore) Register(policy kyverno.ClusterPolicy) {
ps.mu.Lock()

View file

@ -12,16 +12,13 @@ import (
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
kyvernov1 "github.com/nirmata/kyverno/pkg/client/clientset/versioned/typed/kyverno/v1"
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1"
client "github.com/nirmata/kyverno/pkg/dclient"
dclient "github.com/nirmata/kyverno/pkg/dclient"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -32,20 +29,13 @@ const workQueueRetryLimit = 3
type Generator struct {
dclient *dclient.Client
pvInterface kyvernov1.KyvernoV1Interface
// get/list cluster policy violation
pvLister kyvernolister.ClusterPolicyViolationLister
// get/ist namespaced policy violation
nspvLister kyvernolister.NamespacedPolicyViolationLister
// returns true if the cluster policy store has been synced at least once
pvSynced cache.InformerSynced
// returns true if the namespaced cluster policy store has been synced at at least once
nspvSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
dataStore *dataStore
pvLister kyvernolister.ClusterPolicyViolationLister
nspvLister kyvernolister.NamespacedPolicyViolationLister
queue workqueue.RateLimitingInterface
dataStore *dataStore
}
//NewDataStore returns an instance of data store
func newDataStore() *dataStore {
func NewDataStore() *dataStore {
ds := dataStore{
data: make(map[string]Info),
}
@ -105,17 +95,15 @@ type GeneratorInterface interface {
// NewPVGenerator returns a new instance of policy violation generator
func NewPVGenerator(client *kyvernoclient.Clientset, dclient *client.Client,
pvInformer kyvernoinformer.ClusterPolicyViolationInformer,
nspvInformer kyvernoinformer.NamespacedPolicyViolationInformer) *Generator {
pvLister kyvernolister.ClusterPolicyViolationLister,
nspvLister kyvernolister.NamespacedPolicyViolationLister) *Generator {
gen := Generator{
pvInterface: client.KyvernoV1(),
dclient: dclient,
pvLister: pvInformer.Lister(),
pvSynced: pvInformer.Informer().HasSynced,
nspvLister: nspvInformer.Lister(),
nspvSynced: nspvInformer.Informer().HasSynced,
pvLister: pvLister,
nspvLister: nspvLister,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
dataStore: NewDataStore(),
}
return &gen
}
@ -142,10 +130,6 @@ func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Start policy violation generator")
defer glog.Info("Shutting down policy violation generator")
if !cache.WaitForCacheSync(stopCh, gen.pvSynced, gen.nspvSynced) {
glog.Error("event generator: failed to sync informer cache")
}
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}

View file

@ -34,24 +34,18 @@ import (
// WebhookServer contains configured TLS server with MutationWebhook.
// MutationWebhook gets policies from policyController and takes control of the cluster with kubeclient.
type WebhookServer struct {
server http.Server
client *client.Client
kyvernoClient *kyvernoclient.Clientset
// list/get cluster policy resource
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has synced atleast
pSynced cache.InformerSynced
// list/get role binding resource
rbLister rbaclister.RoleBindingLister
// return true if role bining store has synced atleast once
rbSynced cache.InformerSynced
// list/get cluster role binding resource
crbLister rbaclister.ClusterRoleBindingLister
// return true if cluster role binding store has synced atleast once
crbSynced cache.InformerSynced
// generate events
eventGen event.Interface
// webhook registration client
server http.Server
client *client.Client
kyvernoClient *kyvernoclient.Clientset
pLister kyvernolister.ClusterPolicyLister
pvLister kyvernolister.ClusterPolicyViolationLister
namespacepvLister kyvernolister.NamespacedPolicyViolationLister
pListerSynced cache.InformerSynced
pvListerSynced cache.InformerSynced
namespacepvListerSynced cache.InformerSynced
rbLister rbaclister.RoleBindingLister
crbLister rbaclister.ClusterRoleBindingLister
eventGen event.Interface
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
// API to send policy stats for aggregation
policyStatus policy.PolicyStatusInterface
@ -74,6 +68,8 @@ func NewWebhookServer(
client *client.Client,
tlsPair *tlsutils.TlsPemPair,
pInformer kyvernoinformer.ClusterPolicyInformer,
pvInformer kyvernoinformer.ClusterPolicyViolationInformer,
namespacepvInformer kyvernoinformer.NamespacedPolicyViolationInformer,
rbInformer rbacinformer.RoleBindingInformer,
crbInformer rbacinformer.ClusterRoleBindingInformer,
eventGen event.Interface,
@ -100,15 +96,17 @@ func NewWebhookServer(
client: client,
kyvernoClient: kyvernoClient,
pLister: pInformer.Lister(),
pSynced: pInformer.Informer().HasSynced,
rbLister: rbInformer.Lister(),
rbSynced: rbInformer.Informer().HasSynced,
crbLister: crbInformer.Lister(),
crbSynced: crbInformer.Informer().HasSynced,
pvLister: pvInformer.Lister(),
namespacepvLister: namespacepvInformer.Lister(),
pListerSynced: pvInformer.Informer().HasSynced,
pvListerSynced: pInformer.Informer().HasSynced,
namespacepvListerSynced: namespacepvInformer.Informer().HasSynced,
eventGen: eventGen,
webhookRegistrationClient: webhookRegistrationClient,
policyStatus: policyStatus,
configHandler: configHandler,
rbLister: rbInformer.Lister(),
crbLister: crbInformer.Lister(),
cleanUp: cleanUp,
lastReqTime: checker.NewLastReqTime(),
pvGenerator: pvGenerator,
@ -248,10 +246,6 @@ func (ws *WebhookServer) handleAdmissionRequest(request *v1beta1.AdmissionReques
// RunAsync TLS server in separate thread and returns control immediately
func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, ws.pSynced, ws.rbSynced, ws.crbSynced) {
glog.Error("webhook: failed to sync informer cache")
}
go func(ws *WebhookServer) {
glog.V(3).Infof("serving on %s\n", ws.server.Addr)
if err := ws.server.ListenAndServeTLS("", ""); err != http.ErrServerClosed {