1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-29 02:45:06 +00:00

- holds resource webhook creation requests in a quene; - remove webhookinformer from policy controller and webhookregistrationclient

This commit is contained in:
Shuting Zhao 2019-12-04 12:31:27 -08:00
parent f6db1b9e87
commit 0f5cf40eda
8 changed files with 175 additions and 138 deletions

View file

@ -6,6 +6,7 @@ import (
"time"
"github.com/golang/glog"
"github.com/nirmata/kyverno/pkg/checker"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions"
"github.com/nirmata/kyverno/pkg/config"
@ -15,6 +16,7 @@ import (
"github.com/nirmata/kyverno/pkg/policy"
"github.com/nirmata/kyverno/pkg/policystore"
"github.com/nirmata/kyverno/pkg/policyviolation"
"github.com/nirmata/kyverno/pkg/resourcewebhookwatcher"
"github.com/nirmata/kyverno/pkg/signal"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/nirmata/kyverno/pkg/version"
@ -85,10 +87,17 @@ func main() {
webhookRegistrationClient := webhookconfig.NewWebhookRegistrationClient(
clientConfig,
client,
kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
serverIP,
int32(webhookTimeout))
// Resource Mutating Webhook Watcher
lastReqTime := checker.NewLastReqTime()
rWebhookWatcher := resourcewebhookwatcher.NewResourceWebhookWatcher(
lastReqTime,
kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
webhookRegistrationClient,
)
// KYVERNO CRD INFORMER
// watches CRD resources:
// - Policy
@ -133,12 +142,11 @@ func main() {
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().NamespacedPolicyViolations(),
kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(),
webhookRegistrationClient,
configData,
egen,
pvgen,
policyMetaStore)
policyMetaStore,
rWebhookWatcher)
if err != nil {
glog.Fatalf("error creating policy controller: %v\n", err)
}
@ -211,6 +219,8 @@ func main() {
configData,
policyMetaStore,
pvgen,
rWebhookWatcher,
lastReqTime,
cleanUp)
if err != nil {
glog.Fatalf("Unable to create webhook server: %v\n", err)
@ -219,6 +229,7 @@ func main() {
pInformer.Start(stopCh)
kubeInformer.Start(stopCh)
go rWebhookWatcher.Run(stopCh)
go configData.Run(stopCh)
go policyMetaStore.Run(stopCh)
go pc.Run(1, stopCh)

View file

@ -18,7 +18,7 @@ import (
"github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/policystore"
"github.com/nirmata/kyverno/pkg/policyviolation"
"github.com/nirmata/kyverno/pkg/webhookconfig"
"github.com/nirmata/kyverno/pkg/resourcewebhookwatcher"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -28,9 +28,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
mconfiginformer "k8s.io/client-go/informers/admissionregistration/v1beta1"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
mconfiglister "k8s.io/client-go/listers/admissionregistration/v1beta1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
@ -73,12 +71,6 @@ type PolicyController struct {
pvListerSynced cache.InformerSynced
// pvListerSynced returns true if the Policy Violation store has been synced at least once
nspvListerSynced cache.InformerSynced
// mwebhookconfigSynced returns true if the Mutating Webhook Config store has been synced at least once
mwebhookconfigSynced cache.InformerSynced
// list/get mutatingwebhookconfigurations
mWebhookConfigLister mconfiglister.MutatingWebhookConfigurationLister
// WebhookRegistrationClient
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
// Resource manager, manages the mapping for already processed resource
rm resourceManager
// helpers to validate against current loaded configuration
@ -89,6 +81,8 @@ type PolicyController struct {
pMetaStore policystore.UpdateInterface
// policy violation generator
pvGenerator policyviolation.GeneratorInterface
// resourceWebhookWatcher queues the webhook creation request, creates the webhook
resourceWebhookWatcher *resourcewebhookwatcher.ResourceWebhookWatcher
}
// NewPolicyController create a new PolicyController
@ -97,12 +91,11 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
pInformer kyvernoinformer.ClusterPolicyInformer,
pvInformer kyvernoinformer.ClusterPolicyViolationInformer,
nspvInformer kyvernoinformer.NamespacedPolicyViolationInformer,
mconfigwebhookinformer mconfiginformer.MutatingWebhookConfigurationInformer,
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient,
configHandler config.Interface,
eventGen event.Interface,
pvGenerator policyviolation.GeneratorInterface,
pMetaStore policystore.UpdateInterface) (*PolicyController, error) {
pMetaStore policystore.UpdateInterface,
resourceWebhookWatcher *resourcewebhookwatcher.ResourceWebhookWatcher) (*PolicyController, error) {
// Event broad caster
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
@ -113,15 +106,15 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: eventInterface})
pc := PolicyController{
client: client,
kyvernoClient: kyvernoClient,
eventGen: eventGen,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "policy_controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"),
webhookRegistrationClient: webhookRegistrationClient,
configHandler: configHandler,
pMetaStore: pMetaStore,
pvGenerator: pvGenerator,
client: client,
kyvernoClient: kyvernoClient,
eventGen: eventGen,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "policy_controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "policy"),
configHandler: configHandler,
pMetaStore: pMetaStore,
pvGenerator: pvGenerator,
resourceWebhookWatcher: resourceWebhookWatcher,
}
pc.pvControl = RealPVControl{Client: kyvernoClient, Recorder: pc.eventRecorder}
@ -154,8 +147,6 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
pc.pListerSynced = pInformer.Informer().HasSynced
pc.pvListerSynced = pvInformer.Informer().HasSynced
pc.nspvListerSynced = nspvInformer.Informer().HasSynced
pc.mwebhookconfigSynced = mconfigwebhookinformer.Informer().HasSynced
pc.mWebhookConfigLister = mconfigwebhookinformer.Lister()
// resource manager
// rebuild after 300 seconds/ 5 mins
//TODO: pass the time in seconds instead of converting it internally
@ -400,7 +391,7 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
glog.Info("Starting policy controller")
defer glog.Info("Shutting down policy controller")
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.pvListerSynced, pc.nspvListerSynced, pc.mwebhookconfigSynced) {
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.pvListerSynced, pc.nspvListerSynced) {
glog.Error("failed to sync informer cache")
return
}
@ -475,9 +466,9 @@ func (pc *PolicyController) syncPolicy(key string) error {
return err
}
if err := pc.webhookRegistrationClient.CreateResourceMutatingWebhookConfigurationIfRequired(*policy); err != nil {
glog.V(4).Infof("failed to create resource mutating webhook configurations, policies wont be applied on resources: %v", err)
glog.Errorln(err)
// if the policy contains mutating & validation rules and it config does not exist we create one
if policy.HasMutateOrValidate() {
pc.resourceWebhookWatcher.RegisterResourceWebhook()
}
// Deep-copy otherwise we are mutating our cache.

View file

@ -7,27 +7,6 @@ import (
)
func (pc *PolicyController) removeResourceWebhookConfiguration() error {
removeWebhookConfig := func() error {
var err error
// check informer cache
configName := pc.webhookRegistrationClient.GetResourceMutatingWebhookConfigName()
config, err := pc.mWebhookConfigLister.Get(configName)
if err != nil {
glog.V(4).Infof("failed to list mutating webhook config: %v", err)
return err
}
if config == nil {
// as no resource is found
return nil
}
err = pc.webhookRegistrationClient.RemoveResourceMutatingWebhookConfiguration()
if err != nil {
return err
}
glog.V(4).Info("removed resource webhook configuration")
return nil
}
var err error
// get all existing policies
policies, err := pc.pLister.List(labels.NewSelector())
@ -38,13 +17,13 @@ func (pc *PolicyController) removeResourceWebhookConfiguration() error {
if len(policies) == 0 {
glog.V(4).Info("no policies loaded, removing resource webhook configuration if one exists")
return removeWebhookConfig()
return pc.resourceWebhookWatcher.RemoveResourceWebhookConfiguration()
}
// if polices only have generate rules, we dont need the webhook
if !hasMutateOrValidatePolicies(policies) {
glog.V(4).Info("no policies with mutating or validating webhook configurations, remove resource webhook configuration if one exists")
return removeWebhookConfig()
return pc.resourceWebhookWatcher.RemoveResourceWebhookConfiguration()
}
return nil

View file

@ -0,0 +1,26 @@
package resourcewebhookwatcher
import (
"github.com/golang/glog"
)
func (rww *ResourceWebhookWatcher) RemoveResourceWebhookConfiguration() error {
var err error
// check informer cache
configName := rww.webhookRegistrationClient.GetResourceMutatingWebhookConfigName()
config, err := rww.mWebhookConfigLister.Get(configName)
if err != nil {
glog.V(4).Infof("failed to list mutating webhook config: %v", err)
return err
}
if config == nil {
// as no resource is found
return nil
}
err = rww.webhookRegistrationClient.RemoveResourceMutatingWebhookConfiguration()
if err != nil {
return err
}
glog.V(3).Info("removed resource webhook configuration")
return nil
}

View file

@ -0,0 +1,98 @@
package resourcewebhookwatcher
import (
"time"
"github.com/golang/glog"
checker "github.com/nirmata/kyverno/pkg/checker"
webhookconfig "github.com/nirmata/kyverno/pkg/webhookconfig"
errorsapi "k8s.io/apimachinery/pkg/api/errors"
mconfiginformer "k8s.io/client-go/informers/admissionregistration/v1beta1"
mconfiglister "k8s.io/client-go/listers/admissionregistration/v1beta1"
cache "k8s.io/client-go/tools/cache"
)
type ResourceWebhookWatcher struct {
lastReqTime *checker.LastReqTime
// ch holds the requests to create resource mutatingwebhookconfiguration
ch chan bool
mwebhookconfigSynced cache.InformerSynced
// list/get mutatingwebhookconfigurations
mWebhookConfigLister mconfiglister.MutatingWebhookConfigurationLister
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
}
func NewResourceWebhookWatcher(
lastReqTime *checker.LastReqTime,
mconfigwebhookinformer mconfiginformer.MutatingWebhookConfigurationInformer,
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient,
) *ResourceWebhookWatcher {
return &ResourceWebhookWatcher{
lastReqTime: lastReqTime,
ch: make(chan bool),
mwebhookconfigSynced: mconfigwebhookinformer.Informer().HasSynced,
mWebhookConfigLister: mconfigwebhookinformer.Lister(),
webhookRegistrationClient: webhookRegistrationClient,
}
}
func (rww *ResourceWebhookWatcher) RegisterResourceWebhook() {
rww.ch <- true
}
func (rww *ResourceWebhookWatcher) Run(stopCh <-chan struct{}) {
glog.Info("Starting resource webhook watcher")
defer glog.Info("Shutting down resource webhook watcher")
// wait for cache to populate first time
if !cache.WaitForCacheSync(stopCh, rww.mwebhookconfigSynced) {
glog.Error("configuration: failed to sync webhook informer cache")
}
createWebhook := func() {
if err := rww.createResourceMutatingWebhookConfigurationIfRequired(); err != nil {
glog.Errorf("failed to create resource mutating webhook configuration: %v, re-queue creation request", err)
rww.RegisterResourceWebhook()
}
}
for {
select {
case <-rww.ch:
timeDiff := time.Since(rww.lastReqTime.Time())
if timeDiff < checker.DefaultDeadline {
glog.V(3).Info("Verified webhook status, creating webhook configuration")
go createWebhook()
} else {
glog.Info("Webhook is inactive, not creating resource webhook configuration")
}
case <-stopCh:
glog.V(2).Infof("stopping resource webhook watcher")
return
}
}
}
// CreateResourceMutatingWebhookConfigurationIfRequired creates a Mutatingwebhookconfiguration
// for all resource types if there's no mutatingwebhookcfg for existing policy
func (rww *ResourceWebhookWatcher) createResourceMutatingWebhookConfigurationIfRequired() error {
// check cache
configName := rww.webhookRegistrationClient.GetResourceMutatingWebhookConfigName()
config, err := rww.mWebhookConfigLister.Get(configName)
if err != nil && !errorsapi.IsNotFound(err) {
glog.V(4).Infof("failed to list mutating webhook configuration: %v", err)
return err
}
if config != nil {
// mutating webhoook configuration already exists
return nil
}
if err := rww.webhookRegistrationClient.CreateResourceMutatingWebhookConfiguration(); err != nil {
return err
}
glog.V(3).Info("Successfully created mutating webhook configuration for resources")
return nil
}

View file

@ -2,20 +2,14 @@ package webhookconfig
import (
"errors"
"fmt"
"sync"
"time"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/checker"
"github.com/nirmata/kyverno/pkg/config"
client "github.com/nirmata/kyverno/pkg/dclient"
admregapi "k8s.io/api/admissionregistration/v1beta1"
errorsapi "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
mconfiginformer "k8s.io/client-go/informers/admissionregistration/v1beta1"
mconfiglister "k8s.io/client-go/listers/admissionregistration/v1beta1"
rest "k8s.io/client-go/rest"
)
@ -28,29 +22,22 @@ const (
type WebhookRegistrationClient struct {
client *client.Client
clientConfig *rest.Config
// list/get mutatingwebhookconfigurations
mWebhookConfigLister mconfiglister.MutatingWebhookConfigurationLister
// serverIP should be used if running Kyverno out of clutser
serverIP string
timeoutSeconds int32
LastReqTime *checker.LastReqTime
}
// NewWebhookRegistrationClient creates new WebhookRegistrationClient instance
func NewWebhookRegistrationClient(
clientConfig *rest.Config,
client *client.Client,
mconfigwebhookinformer mconfiginformer.MutatingWebhookConfigurationInformer,
serverIP string,
webhookTimeout int32) *WebhookRegistrationClient {
return &WebhookRegistrationClient{
clientConfig: clientConfig,
client: client,
mWebhookConfigLister: mconfigwebhookinformer.Lister(),
serverIP: serverIP,
timeoutSeconds: webhookTimeout,
LastReqTime: checker.NewLastReqTime(),
clientConfig: clientConfig,
client: client,
serverIP: serverIP,
timeoutSeconds: webhookTimeout,
}
}
@ -71,11 +58,6 @@ func (wrc *WebhookRegistrationClient) Register() error {
return err
}
if err := wrc.waitForWebhookSuccess(); err != nil {
return fmt.Errorf("inactive webhook, not registering webhookconfigurations for policy: %v", err)
}
glog.V(4).Infof("webhook is active, registering policy webhookcofigurations")
// Static Webhook configuration on Policy CRD
// create Policy CRD validating webhook configuration resource
// used for validating Policy CR
@ -100,40 +82,10 @@ func (wrc *WebhookRegistrationClient) RemoveWebhookConfigurations(cleanUp chan<-
close(cleanUp)
}
// CreateResourceMutatingWebhookConfigurationIfRequired creates a Mutatingwebhookconfiguration for all resource types
// if there's no mutatingwebhookcfg for existing policy
func (wrc *WebhookRegistrationClient) CreateResourceMutatingWebhookConfigurationIfRequired(policy kyverno.ClusterPolicy) error {
// if the policy contains mutating & validation rules and it config does not exist we create one
if policy.HasMutateOrValidate() {
// check cache
configName := wrc.GetResourceMutatingWebhookConfigName()
config, err := wrc.mWebhookConfigLister.Get(configName)
if !errorsapi.IsNotFound(err) {
glog.V(4).Infof("failed to list mutating webhook configuration: %v", err)
return err
}
if config != nil {
// mutating webhoook configuration already exists
return nil
}
if err := wrc.createResourceMutatingWebhookConfiguration(); err != nil {
return err
}
}
return nil
}
//CreateResourceMutatingWebhookConfiguration create a Mutatingwebhookconfiguration resource for all resource type
// used to forward request to kyverno webhooks to apply policeis
// Mutationg webhook is be used for Mutating & Validating purpose
func (wrc *WebhookRegistrationClient) createResourceMutatingWebhookConfiguration() error {
if err := wrc.waitForWebhookSuccess(); err != nil {
return fmt.Errorf("inactive webhook, not registering webhookconfiguration for resource: %v", err)
}
glog.V(4).Infof("webhook is active, registering resource mutating webhookcofigurations")
func (wrc *WebhookRegistrationClient) CreateResourceMutatingWebhookConfiguration() error {
var caData []byte
var config *admregapi.MutatingWebhookConfiguration
@ -331,28 +283,3 @@ func (wrc *WebhookRegistrationClient) removePolicyValidatingWebhookConfiguration
glog.V(4).Infof("succesfully deleted policy webhook configuration %s", validatingConfig)
}
}
// waitForWebhookSuccess checks webhook status by checking the
// LastReqTime set in webhook server, returns immediately if active
// if not, then waits for webhook to be active, times out in 60s
func (wrc *WebhookRegistrationClient) waitForWebhookSuccess() error {
// as default timeout for webhook checker is set to 60s
// resource webhook creation times out in a cycle of(60s) webhook checker
backoff := wait.Backoff{
Duration: time.Second,
Factor: 2,
Steps: 7,
}
count := 0
return wait.ExponentialBackoff(backoff, func() (bool, error) {
timeDiff := time.Since(wrc.LastReqTime.Time())
if timeDiff < checker.DefaultDeadline {
glog.Infof("Verified webhook status, creating webhook configuration")
return true, nil
}
glog.V(4).Infof("webhook is inactive, retrying #%d", count)
count++
return false, nil
})
}

View file

@ -37,9 +37,10 @@ func (ws *WebhookServer) handlePolicyValidation(request *v1beta1.AdmissionReques
}
if admissionResp.Allowed {
// create mutating resource mutatingwebhookconfiguration if not present
if err := ws.webhookRegistrationClient.CreateResourceMutatingWebhookConfigurationIfRequired(*policy); err != nil {
glog.Error("failed to created resource mutating webhook configuration, policies wont be applied on the resource")
// if the policy contains mutating & validation rules and it config does not exist we create one
if policy.HasMutateOrValidate() {
// queue the request
ws.resourceWebhookWatcher.RegisterResourceWebhook()
}
}
return admissionResp

View file

@ -21,6 +21,7 @@ import (
"github.com/nirmata/kyverno/pkg/policy"
"github.com/nirmata/kyverno/pkg/policystore"
"github.com/nirmata/kyverno/pkg/policyviolation"
"github.com/nirmata/kyverno/pkg/resourcewebhookwatcher"
tlsutils "github.com/nirmata/kyverno/pkg/tls"
userinfo "github.com/nirmata/kyverno/pkg/userinfo"
"github.com/nirmata/kyverno/pkg/webhookconfig"
@ -64,7 +65,8 @@ type WebhookServer struct {
// store to hold policy meta data for faster lookup
pMetaStore policystore.LookupInterface
// policy violation generator
pvGenerator policyviolation.GeneratorInterface
pvGenerator policyviolation.GeneratorInterface
resourceWebhookWatcher *resourcewebhookwatcher.ResourceWebhookWatcher
}
// NewWebhookServer creates new instance of WebhookServer accordingly to given configuration
@ -82,6 +84,8 @@ func NewWebhookServer(
configHandler config.Interface,
pMetaStore policystore.LookupInterface,
pvGenerator policyviolation.GeneratorInterface,
resourceWebhookWatcher *resourcewebhookwatcher.ResourceWebhookWatcher,
lastReqTime *checker.LastReqTime,
cleanUp chan<- struct{}) (*WebhookServer, error) {
if tlsPair == nil {
@ -96,7 +100,6 @@ func NewWebhookServer(
tlsConfig.Certificates = []tls.Certificate{pair}
ws := &WebhookServer{
client: client,
kyvernoClient: kyvernoClient,
pLister: pInformer.Lister(),
@ -110,9 +113,10 @@ func NewWebhookServer(
policyStatus: policyStatus,
configHandler: configHandler,
cleanUp: cleanUp,
lastReqTime: webhookRegistrationClient.LastReqTime,
lastReqTime: lastReqTime,
pvGenerator: pvGenerator,
pMetaStore: pMetaStore,
resourceWebhookWatcher: resourceWebhookWatcher,
}
mux := http.NewServeMux()
mux.HandleFunc(config.MutatingWebhookServicePath, ws.serve)