From ea9491a1057747846802cc2347ef858d9fd463a8 Mon Sep 17 00:00:00 2001 From: belyshevdenis Date: Thu, 21 Feb 2019 20:31:18 +0200 Subject: [PATCH] NK-10: Controller renamed to PolicyController. Created MutationWebhook class in new webhook package. Implemented filtering of incoming objects by Kind. Implemented simple usage of PolicyController in MutationWebhook. --- .gitignore | 3 +- controller/controller.go | 35 +++++----- main.go | 69 +++++++++--------- server/server.go | 146 ++++++++++++++++----------------------- webhooks/admission.go | 38 ++++++++++ webhooks/mutation.go | 63 +++++++++++++++++ 6 files changed, 216 insertions(+), 138 deletions(-) create mode 100644 webhooks/admission.go create mode 100644 webhooks/mutation.go diff --git a/.gitignore b/.gitignore index 2d94abef18..87da64c4df 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ pkg/apis/policy/v1alpha1/zz_generated.deepcopy.go certs kube-policy Gopkg.lock -Dockerfile \ No newline at end of file +Dockerfile +.vscode \ No newline at end of file diff --git a/controller/controller.go b/controller/controller.go index 09407448da..94da05a69d 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -14,15 +14,15 @@ import ( lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" ) -// Controller for CRD -type Controller struct { +// PolicyController for CRD +type PolicyController struct { policyInformerFactory informers.SharedInformerFactory policyLister lister.PolicyLister logger *log.Logger } -// NewController from cmd args -func NewController(masterURL, kubeconfigPath string, logger *log.Logger) (*Controller, error) { +// NewPolicyController from cmd args +func NewPolicyController(masterURL, kubeconfigPath string, logger *log.Logger) (*PolicyController, error) { cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfigPath) if err != nil { logger.Printf("Error building kubeconfig: %v\n", err) @@ -38,7 +38,7 @@ func NewController(masterURL, kubeconfigPath string, logger *log.Logger) (*Contr policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, time.Second*30) policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies() - controller := &Controller{ + controller := &PolicyController{ policyInformerFactory: policyInformerFactory, policyLister: policyInformer.Lister(), } @@ -53,45 +53,48 @@ func NewController(masterURL, kubeconfigPath string, logger *log.Logger) (*Contr } // Run is main controller thread -func (c *Controller) Run(stopCh <-chan struct{}) { +func (c *PolicyController) Run(stopCh <-chan struct{}) { //c.policyInformerFactory.Start(stopCh) } // GetPolicies retrieves all policy resources // from cache. Cache is refreshed by informer -func (c *Controller) GetPolicies() ([]*types.Policy, error) { +func (c *PolicyController) GetPolicies() []types.Policy { // Create nil Selector to grab all the policies - cachedPolicies, err := c.policyLister.List(labels.NewSelector()) + selector := labels.NewSelector() + cachedPolicies, err := c.policyLister.List(selector) + if err != nil { - return nil, err + c.logger.Printf("Error: %v", err) + return nil } - var policies []*types.Policy + var policies []types.Policy for _, elem := range cachedPolicies { - policies = append(policies, elem.DeepCopy()) + policies = append(policies, *elem.DeepCopy()) } - return policies, nil + return policies } -func (c *Controller) createPolicyHandler(resource interface{}) { +func (c *PolicyController) createPolicyHandler(resource interface{}) { key := c.getResourceKey(resource) c.logger.Printf("Created policy: %s\n", key) } -func (c *Controller) updatePolicyHandler(oldResource, newResource interface{}) { +func (c *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) { oldKey := c.getResourceKey(oldResource) newKey := c.getResourceKey(newResource) c.logger.Printf("Updated policy from %s to %s\n", oldKey, newKey) } -func (c *Controller) deletePolicyHandler(resource interface{}) { +func (c *PolicyController) deletePolicyHandler(resource interface{}) { key := c.getResourceKey(resource) c.logger.Printf("Deleted policy: %s\n", key) } -func (c *Controller) getResourceKey(resource interface{}) string { +func (c *PolicyController) getResourceKey(resource interface{}) string { if key, err := cache.MetaNamespaceKeyFunc(resource); err != nil { c.logger.Printf("Error retrieving policy key: %v\n", err) return "" diff --git a/main.go b/main.go index 4ab71a500b..9adda990bf 100644 --- a/main.go +++ b/main.go @@ -1,22 +1,22 @@ package main import ( - "log" - "os" - "flag" - "fmt" + "flag" + "fmt" + "log" + "os" - "github.com/nirmata/kube-policy/controller" - "github.com/nirmata/kube-policy/server" - - "k8s.io/sample-controller/pkg/signals" + "github.com/nirmata/kube-policy/controller" + "github.com/nirmata/kube-policy/server" + + "k8s.io/sample-controller/pkg/signals" ) var ( - masterURL string - kubeconfig string - cert string - key string + masterURL string + kubeconfig string + cert string + key string ) func main() { @@ -26,34 +26,33 @@ func main() { log.Fatal("TLS certificate or/and key is not set") } - httpLogger := log.New(os.Stdout, "http: ", log.LstdFlags|log.Lshortfile) - crdcLogger := log.New(os.Stdout, "crdc: ", log.LstdFlags|log.Lshortfile) + crdcLogger := log.New(os.Stdout, "Policy Controller: ", log.LstdFlags|log.Lshortfile) + controller, err := controller.NewPolicyController(masterURL, kubeconfig, crdcLogger) + if err != nil { + fmt.Printf("Error creating PolicyController! Error: %s\n", err) + return + } - server := server.NewWebhookServer(cert, key, httpLogger) + httpLogger := log.New(os.Stdout, "HTTPS Server: ", log.LstdFlags|log.Lshortfile) + server := server.NewWebhookServer(cert, key, controller, httpLogger) server.RunAsync() - controller, err := controller.NewController(masterURL, kubeconfig, crdcLogger) - if err != nil { - fmt.Printf("Error creating Controller! Error: %s\n", err) - return - } + stopCh := signals.SetupSignalHandler() + controller.Run(stopCh) - stopCh := signals.SetupSignalHandler() - controller.Run(stopCh) + if err != nil { + fmt.Printf("Error running PolicyController! Error: %s\n", err) + } - if err != nil { - fmt.Printf("Error running Controller! Error: %s\n", err) - } - - fmt.Printf("Policy Controller has started") - <-stopCh - server.Stop() - fmt.Printf("Policy Controller has stopped") + fmt.Printf("Policy PolicyController has started") + <-stopCh + server.Stop() + fmt.Printf("Policy PolicyController has stopped") } func init() { - flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") - flag.StringVar(&cert, "cert", "", "TLS certificate used in connection with cluster.") - flag.StringVar(&key, "key", "", "Key, used in TLS connection.") -} \ No newline at end of file + flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") + flag.StringVar(&cert, "cert", "", "TLS certificate used in connection with cluster.") + flag.StringVar(&key, "key", "", "Key, used in TLS connection.") +} diff --git a/server/server.go b/server/server.go index 69ba77b6ae..392c5b5871 100644 --- a/server/server.go +++ b/server/server.go @@ -11,16 +11,18 @@ import ( "os" "time" + controller "github.com/nirmata/kube-policy/controller" + webhooks "github.com/nirmata/kube-policy/webhooks" v1beta1 "k8s.io/api/admission/v1beta1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - coreTypes "k8s.io/kubernetes/pkg/apis/core" ) // WebhookServer is a struct that describes // TLS server with mutation webhook type WebhookServer struct { - server http.Server - logger *log.Logger + server http.Server + logger *log.Logger + policyController *controller.PolicyController + mutationWebhook *webhooks.MutationWebhook } type patchOperations struct { @@ -33,6 +35,48 @@ type patchOperation struct { Value interface{} `json:"value,omitempty"` } +// NewWebhookServer creates new instance of WebhookServer and configures it +func NewWebhookServer(certFile string, keyFile string, controller *controller.PolicyController, logger *log.Logger) *WebhookServer { + if logger == nil { + logger = log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) + } + if controller == nil { + logger.Fatal("Controller is not specified for webhook server") + } + + var config tls.Config + pair, err := tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + logger.Fatal("Unable to load certificate and key: ", err) + } + config.Certificates = []tls.Certificate{pair} + + mw, err := webhooks.NewMutationWebhook(logger) + if err != nil { + logger.Fatal("Unable to create mutation webhook: ", err) + } + + ws := &WebhookServer{ + logger: logger, + policyController: controller, + mutationWebhook: mw, + } + + mux := http.NewServeMux() + mux.HandleFunc("/mutate", ws.serve) + + ws.server = http.Server{ + Addr: ":443", // Listen on port for HTTPS requests + TLSConfig: &config, + Handler: mux, + ErrorLog: logger, + ReadTimeout: 15 * time.Second, + WriteTimeout: 15 * time.Second, + } + + return ws +} + func (ws *WebhookServer) serve(w http.ResponseWriter, r *http.Request) { if r.URL.Path == "/mutate" { admissionReview := ws.parseAdmissionReview(r, w) @@ -40,21 +84,27 @@ func (ws *WebhookServer) serve(w http.ResponseWriter, r *http.Request) { return } - admissionResponse := ws.mutate(admissionReview) - if admissionResponse != nil { - admissionReview.Response = admissionResponse - if admissionReview.Request != nil { - admissionReview.Response.UID = admissionReview.Request.UID + var admissionResponse *v1beta1.AdmissionResponse + if webhooks.AdmissionIsRequired(admissionReview.Request) { + admissionResponse = ws.mutationWebhook.Mutate(admissionReview.Request, ws.policyController.GetPolicies()) + } + + if admissionResponse == nil { + admissionResponse = &v1beta1.AdmissionResponse{ + Allowed: true, } } + admissionReview.Response = admissionResponse + admissionReview.Response.UID = admissionReview.Request.UID + responseJson, err := json.Marshal(admissionReview) if err != nil { http.Error(w, fmt.Sprintf("Could not encode response: %v", err), http.StatusInternalServerError) return } - ws.logger.Printf("!!! Writing success !!! Response body: %v", string(responseJson)) + ws.logger.Printf("Response body\n:%v", string(responseJson)) w.Header().Set("Content-Type", "application/json; charset=utf-8") if _, err := w.Write(responseJson); err != nil { http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError) @@ -96,50 +146,6 @@ func (ws *WebhookServer) parseAdmissionReview(request *http.Request, writer http } } -func (ws *WebhookServer) mutate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse { - request := ar.Request - - ws.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v", - request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo) - - if admissionRequired(request) { - var configMap coreTypes.ConfigMap - if err := json.Unmarshal(request.Object.Raw, &configMap); err != nil { - ws.logger.Printf("Could not unmarshal raw object: %v", err) - return &v1beta1.AdmissionResponse{ - Result: &metav1.Status{ - Message: err.Error(), - }, - } - } - /*patch := patchOperation{ - Path: "/labels", - Op: "add", - Value: map[string]string{ - "is-mutated": "true", - }, - }*/ - patch := `[ {"op":"add","path":"/metadata/labels","value":{"is-mutated":"true"}} ]` - - return &v1beta1.AdmissionResponse{ - Allowed: true, - Patch: []byte(patch), - PatchType: func() *v1beta1.PatchType { - pt := v1beta1.PatchTypeJSONPatch - return &pt - }(), - } - } else { - return &v1beta1.AdmissionResponse{ - Allowed: true, - } - } -} - -func admissionRequired(request *v1beta1.AdmissionRequest) bool { - return request.Kind.Kind == "ConfigMap" -} - // RunAsync runs TLS server in separate // thread and returns control immediately func (ws *WebhookServer) RunAsync() { @@ -160,35 +166,3 @@ func (ws *WebhookServer) Stop() { ws.server.Close() } } - -// NewWebhookServer creates new instance of WebhookServer and configures it -func NewWebhookServer(certFile string, keyFile string, logger *log.Logger) *WebhookServer { - if logger == nil { - logger = log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile) - } - - var config tls.Config - pair, err := tls.LoadX509KeyPair(certFile, keyFile) - if err != nil { - logger.Fatal("Unable to load certificate and key: ", err) - } - config.Certificates = []tls.Certificate{pair} - - mux := http.NewServeMux() - - ws := &WebhookServer{ - server: http.Server{ - Addr: ":443", // Listen on port for HTTPS requests - TLSConfig: &config, - Handler: mux, - ErrorLog: logger, - ReadTimeout: 5 * time.Second, - WriteTimeout: 5 * time.Second, - }, - logger: logger, - } - - mux.HandleFunc("/mutate", ws.serve) - - return ws -} diff --git a/webhooks/admission.go b/webhooks/admission.go new file mode 100644 index 0000000000..5e69a99fd0 --- /dev/null +++ b/webhooks/admission.go @@ -0,0 +1,38 @@ +package webhooks + +import "k8s.io/api/admission/v1beta1" + +var supportedKinds = [...]string{ + "ConfigMap", + "CronJob", + "DaemonSet", + "Deployment", + "Endpoint", + "HorizontalPodAutoscaler", + "Ingress", + "Job", + "LimitRange", + "Namespace", + "NetworkPolicy", + "PersistentVolumeClaim", + "PodDisruptionBudget", + "PodTemplate", + "ResourceQuota", + "Secret", + "Service", + "StatefulSet", +} + +func kindIsSupported(kind string) bool { + for _, k := range supportedKinds { + if k == kind { + return true + } + } + return false +} + +func AdmissionIsRequired(request *v1beta1.AdmissionRequest) bool { + // Here you can make additional hardcoded checks + return kindIsSupported(request.Kind.Kind) +} diff --git a/webhooks/mutation.go b/webhooks/mutation.go new file mode 100644 index 0000000000..4b85e19a45 --- /dev/null +++ b/webhooks/mutation.go @@ -0,0 +1,63 @@ +package webhooks + +import ( + "encoding/json" + "errors" + "log" + + types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1" + v1beta1 "k8s.io/api/admission/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coreTypes "k8s.io/kubernetes/pkg/apis/core" +) + +type MutationWebhook struct { + logger *log.Logger +} + +func NewMutationWebhook(logger *log.Logger) (*MutationWebhook, error) { + if logger == nil { + return nil, errors.New("Logger must be set for the mutation webhook") + } + return &MutationWebhook{logger: logger}, nil +} + +func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest, policies []types.Policy) *v1beta1.AdmissionResponse { + mw.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v", + request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo) + + if len(policies) == 0 { + return nil + } + + var configMap coreTypes.ConfigMap + if err := json.Unmarshal(request.Object.Raw, &configMap); err != nil { + mw.logger.Printf("Could not unmarshal raw object: %v", err) + return errorToResponse(err) + } + /*patch := patchOperation{ + Path: "/labels", + Op: "add", + Value: map[string]string{ + "is-mutated": "true", + }, + }*/ + patch := `[ {"op":"add","path":"/metadata/labels","value":{"is-mutated":"true"}} ]` + + return &v1beta1.AdmissionResponse{ + Allowed: true, + Patch: []byte(patch), + PatchType: func() *v1beta1.PatchType { + pt := v1beta1.PatchTypeJSONPatch + return &pt + }(), + } +} + +func errorToResponse(err error) *v1beta1.AdmissionResponse { + return &v1beta1.AdmissionResponse{ + Result: &metav1.Status{ + Message: err.Error(), + }, + } +}