From ab008189e7d2538074d542893dce9b39cf0029d6 Mon Sep 17 00:00:00 2001 From: shivdudhani Date: Wed, 15 May 2019 12:29:09 -0700 Subject: [PATCH] sharedinfomer factory + update status --- client/client.go | 9 ++++ main.go | 22 +++------ pkg/controller/controller.go | 10 ++-- pkg/event/controller.go | 9 ++-- pkg/sharedinformer/sharedinformerfactory.go | 55 +++++++++++++++++++++ pkg/violation/builder.go | 27 +++++----- pkg/webhooks/server.go | 9 ++-- 7 files changed, 99 insertions(+), 42 deletions(-) create mode 100644 pkg/sharedinformer/sharedinformerfactory.go diff --git a/client/client.go b/client/client.go index 10e2e217d5..c64d46c093 100644 --- a/client/client.go +++ b/client/client.go @@ -130,6 +130,15 @@ func (c *Client) UpdateResource(kind string, namespace string, obj interface{}) return nil, fmt.Errorf("Unable to update resource ") } +// UpdateStatusResource updates the resource "status" subresource +func (c *Client) UpdateStatusResource(kind string, namespace string, obj interface{}) (*unstructured.Unstructured, error) { + // convert typed to unstructured obj + if unstructuredObj := convertToUnstructured(obj); unstructuredObj != nil { + return c.getResourceInterface(kind, namespace).UpdateStatus(unstructuredObj, meta.UpdateOptions{}) + } + return nil, fmt.Errorf("Unable to update resource ") +} + func convertToUnstructured(obj interface{}) *unstructured.Unstructured { unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { diff --git a/main.go b/main.go index 66e7e1a250..cdbdb32587 100644 --- a/main.go +++ b/main.go @@ -7,10 +7,9 @@ import ( "k8s.io/sample-controller/pkg/signals" client "github.com/nirmata/kube-policy/client" - policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" - informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" controller "github.com/nirmata/kube-policy/pkg/controller" event "github.com/nirmata/kube-policy/pkg/event" + "github.com/nirmata/kube-policy/pkg/sharedinformer" "github.com/nirmata/kube-policy/pkg/violation" "github.com/nirmata/kube-policy/pkg/webhooks" ) @@ -32,21 +31,16 @@ func main() { log.Fatalf("Error creating client: %v\n", err) } - policyClientset, err := policyclientset.NewForConfig(clientConfig) + policyInformerFactory, err := sharedinformer.NewSharedInformerFactory(clientConfig) if err != nil { - log.Fatalf("Error creating policyClient: %v\n", err) + log.Fatalf("Error creating policy sharedinformer: %v\n", err) } - - //TODO wrap the policyInformer inside a factory - policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0) - policyInformer := policyInformerFactory.Kubepolicy().V1alpha1().Policies() - - eventController := event.NewEventController(client, policyInformer.Lister(), nil) - violationBuilder := violation.NewPolicyViolationBuilder(client, policyInformer.Lister(), policyClientset, eventController, nil) + eventController := event.NewEventController(client, policyInformerFactory, nil) + violationBuilder := violation.NewPolicyViolationBuilder(client, policyInformerFactory, eventController, nil) policyController := controller.NewPolicyController( client, - policyInformer, + policyInformerFactory, violationBuilder, eventController, nil) @@ -56,7 +50,7 @@ func main() { log.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err) } - server, err := webhooks.NewWebhookServer(tlsPair, policyInformer.Lister(), nil) + server, err := webhooks.NewWebhookServer(tlsPair, policyInformerFactory, nil) if err != nil { log.Fatalf("Unable to create webhook server: %v\n", err) } @@ -68,7 +62,7 @@ func main() { stopCh := signals.SetupSignalHandler() - policyInformerFactory.Start(stopCh) + policyInformerFactory.Run(stopCh) eventController.Run(stopCh) if err = policyController.Run(stopCh); err != nil { diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 12d2b109a8..8d63b22f03 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -8,9 +8,9 @@ import ( client "github.com/nirmata/kube-policy/client" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" event "github.com/nirmata/kube-policy/pkg/event" + "github.com/nirmata/kube-policy/pkg/sharedinformer" violation "github.com/nirmata/kube-policy/pkg/violation" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,7 +33,7 @@ type PolicyController struct { // NewPolicyController from cmd args func NewPolicyController(client *client.Client, - policyInformer infomertypes.PolicyInformer, + policyInformer sharedinformer.PolicyInformer, violationBuilder violation.Generator, eventController event.Generator, logger *log.Logger) *PolicyController { @@ -43,15 +43,15 @@ func NewPolicyController(client *client.Client, } controller := &PolicyController{ client: client, - policyLister: policyInformer.Lister(), - policySynced: policyInformer.Informer().HasSynced, + policyLister: policyInformer.GetLister(), + policySynced: policyInformer.GetInfomer().HasSynced, violationBuilder: violationBuilder, eventBuilder: eventController, logger: logger, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName), } - policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + policyInformer.GetInfomer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: controller.createPolicyHandler, UpdateFunc: controller.updatePolicyHandler, DeleteFunc: controller.deletePolicyHandler, diff --git a/pkg/event/controller.go b/pkg/event/controller.go index ac726fcf1a..6994d37a5a 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -9,7 +9,8 @@ import ( client "github.com/nirmata/kube-policy/client" "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme" - policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + v1alpha1 "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + "github.com/nirmata/kube-policy/pkg/sharedinformer" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -22,7 +23,7 @@ import ( type controller struct { client *client.Client - policyLister policylister.PolicyLister + policyLister v1alpha1.PolicyLister queue workqueue.RateLimitingInterface recorder record.EventRecorder logger *log.Logger @@ -42,7 +43,7 @@ type Controller interface { //NewEventController to generate a new event controller func NewEventController(client *client.Client, - policyLister policylister.PolicyLister, + shareInformer sharedinformer.PolicyInformer, logger *log.Logger) Controller { if logger == nil { @@ -51,7 +52,7 @@ func NewEventController(client *client.Client, controller := &controller{ client: client, - policyLister: policyLister, + policyLister: shareInformer.GetLister(), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName), recorder: initRecorder(client), logger: logger, diff --git a/pkg/sharedinformer/sharedinformerfactory.go b/pkg/sharedinformer/sharedinformerfactory.go new file mode 100644 index 0000000000..272f151ad8 --- /dev/null +++ b/pkg/sharedinformer/sharedinformerfactory.go @@ -0,0 +1,55 @@ +package sharedinformer + +import ( + "fmt" + + policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" + informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions" + infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" + v1alpha1 "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" +) + +type PolicyInformer interface { + GetLister() v1alpha1.PolicyLister + GetInfomer() cache.SharedIndexInformer +} + +type SharedInfomer interface { + PolicyInformer + Run(stopCh <-chan struct{}) +} + +type sharedInfomer struct { + policyInformerFactory informers.SharedInformerFactory +} + +//NewSharedInformer returns shared informer +func NewSharedInformerFactory(clientConfig *rest.Config) (SharedInfomer, error) { + // create policy client + policyClientset, err := policyclientset.NewForConfig(clientConfig) + if err != nil { + return nil, fmt.Errorf("Error creating policyClient: %v\n", err) + } + //TODO: replace with NewSharedInformerFactoryWithOptions + policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0) + return &sharedInfomer{ + policyInformerFactory: policyInformerFactory, + }, nil +} + +func (si *sharedInfomer) Run(stopCh <-chan struct{}) { + si.policyInformerFactory.Start(stopCh) +} + +func (si *sharedInfomer) getInfomer() infomertypes.PolicyInformer { + return si.policyInformerFactory.Kubepolicy().V1alpha1().Policies() +} +func (si *sharedInfomer) GetInfomer() cache.SharedIndexInformer { + return si.getInfomer().Informer() +} + +func (si *sharedInfomer) GetLister() v1alpha1.PolicyLister { + return si.getInfomer().Lister() +} diff --git a/pkg/violation/builder.go b/pkg/violation/builder.go index 8fe8e4364b..35209f49c3 100644 --- a/pkg/violation/builder.go +++ b/pkg/violation/builder.go @@ -7,9 +7,9 @@ import ( client "github.com/nirmata/kube-policy/client" types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" - policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned" - policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + v1alpha1 "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" event "github.com/nirmata/kube-policy/pkg/event" + "github.com/nirmata/kube-policy/pkg/sharedinformer" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" ) @@ -20,11 +20,10 @@ type Generator interface { } type builder struct { - client *client.Client - policyLister policylister.PolicyLister - policyInterface policyclientset.Interface - eventBuilder event.Generator - logger *log.Logger + client *client.Client + policyLister v1alpha1.PolicyLister + eventBuilder event.Generator + logger *log.Logger } //Builder is to build policy violations @@ -36,8 +35,7 @@ type Builder interface { //NewPolicyViolationBuilder returns new violation builder func NewPolicyViolationBuilder(client *client.Client, - policyLister policylister.PolicyLister, - policyInterface policyclientset.Interface, + sharedInfomer sharedinformer.PolicyInformer, eventController event.Generator, logger *log.Logger) Builder { @@ -46,11 +44,10 @@ func NewPolicyViolationBuilder(client *client.Client, } builder := &builder{ - client: client, - policyLister: policyLister, - policyInterface: policyInterface, - eventBuilder: eventController, - logger: logger, + client: client, + policyLister: sharedInfomer.GetLister(), + eventBuilder: eventController, + logger: logger, } return builder } @@ -93,7 +90,7 @@ func (b *builder) processViolation(info Info) error { modifiedPolicy.Status.Violations = modifiedViolations // Violations are part of the status sub resource, so we can use the Update Status api instead of updating the policy object - _, err = b.policyInterface.KubepolicyV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy) + _, err = b.client.UpdateStatusResource("policy", namespace, modifiedPolicy) if err != nil { return err } diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index b74a6a36e8..4bdb60de9e 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -13,9 +13,10 @@ import ( "time" "github.com/nirmata/kube-policy/config" - policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" + "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" engine "github.com/nirmata/kube-policy/pkg/engine" "github.com/nirmata/kube-policy/pkg/engine/mutation" + "github.com/nirmata/kube-policy/pkg/sharedinformer" tlsutils "github.com/nirmata/kube-policy/pkg/tls" v1beta1 "k8s.io/api/admission/v1beta1" "k8s.io/apimachinery/pkg/labels" @@ -26,7 +27,7 @@ import ( // MutationWebhook gets policies from policyController and takes control of the cluster with kubeclient. type WebhookServer struct { server http.Server - policyLister policylister.PolicyLister + policyLister v1alpha1.PolicyLister logger *log.Logger } @@ -34,7 +35,7 @@ type WebhookServer struct { // Policy Controller and Kubernetes Client should be initialized in configuration func NewWebhookServer( tlsPair *tlsutils.TlsPemPair, - policyLister policylister.PolicyLister, + shareInformer sharedinformer.PolicyInformer, logger *log.Logger) (*WebhookServer, error) { if logger == nil { logger = log.New(os.Stdout, "Webhook Server: ", log.LstdFlags) @@ -52,7 +53,7 @@ func NewWebhookServer( tlsConfig.Certificates = []tls.Certificate{pair} ws := &WebhookServer{ - policyLister: policyLister, + policyLister: shareInformer.GetLister(), logger: logger, }