1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

NK-10: Controller renamed to PolicyController. Created MutationWebhook class in new webhook package. Implemented filtering of incoming objects by Kind. Implemented simple usage of PolicyController in MutationWebhook.

This commit is contained in:
belyshevdenis 2019-02-21 20:31:18 +02:00
parent 81e54bb6a0
commit ea9491a105
6 changed files with 216 additions and 138 deletions

1
.gitignore vendored
View file

@ -5,3 +5,4 @@ certs
kube-policy
Gopkg.lock
Dockerfile
.vscode

View file

@ -14,15 +14,15 @@ import (
lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
)
// Controller for CRD
type Controller struct {
// PolicyController for CRD
type PolicyController struct {
policyInformerFactory informers.SharedInformerFactory
policyLister lister.PolicyLister
logger *log.Logger
}
// NewController from cmd args
func NewController(masterURL, kubeconfigPath string, logger *log.Logger) (*Controller, error) {
// NewPolicyController from cmd args
func NewPolicyController(masterURL, kubeconfigPath string, logger *log.Logger) (*PolicyController, error) {
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfigPath)
if err != nil {
logger.Printf("Error building kubeconfig: %v\n", err)
@ -38,7 +38,7 @@ func NewController(masterURL, kubeconfigPath string, logger *log.Logger) (*Contr
policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, time.Second*30)
policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies()
controller := &Controller{
controller := &PolicyController{
policyInformerFactory: policyInformerFactory,
policyLister: policyInformer.Lister(),
}
@ -53,45 +53,48 @@ func NewController(masterURL, kubeconfigPath string, logger *log.Logger) (*Contr
}
// Run is main controller thread
func (c *Controller) Run(stopCh <-chan struct{}) {
func (c *PolicyController) Run(stopCh <-chan struct{}) {
//c.policyInformerFactory.Start(stopCh)
}
// GetPolicies retrieves all policy resources
// from cache. Cache is refreshed by informer
func (c *Controller) GetPolicies() ([]*types.Policy, error) {
func (c *PolicyController) GetPolicies() []types.Policy {
// Create nil Selector to grab all the policies
cachedPolicies, err := c.policyLister.List(labels.NewSelector())
selector := labels.NewSelector()
cachedPolicies, err := c.policyLister.List(selector)
if err != nil {
return nil, err
c.logger.Printf("Error: %v", err)
return nil
}
var policies []*types.Policy
var policies []types.Policy
for _, elem := range cachedPolicies {
policies = append(policies, elem.DeepCopy())
policies = append(policies, *elem.DeepCopy())
}
return policies, nil
return policies
}
func (c *Controller) createPolicyHandler(resource interface{}) {
func (c *PolicyController) createPolicyHandler(resource interface{}) {
key := c.getResourceKey(resource)
c.logger.Printf("Created policy: %s\n", key)
}
func (c *Controller) updatePolicyHandler(oldResource, newResource interface{}) {
func (c *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) {
oldKey := c.getResourceKey(oldResource)
newKey := c.getResourceKey(newResource)
c.logger.Printf("Updated policy from %s to %s\n", oldKey, newKey)
}
func (c *Controller) deletePolicyHandler(resource interface{}) {
func (c *PolicyController) deletePolicyHandler(resource interface{}) {
key := c.getResourceKey(resource)
c.logger.Printf("Deleted policy: %s\n", key)
}
func (c *Controller) getResourceKey(resource interface{}) string {
func (c *PolicyController) getResourceKey(resource interface{}) string {
if key, err := cache.MetaNamespaceKeyFunc(resource); err != nil {
c.logger.Printf("Error retrieving policy key: %v\n", err)
return ""

65
main.go
View file

@ -1,22 +1,22 @@
package main
import (
"log"
"os"
"flag"
"fmt"
"flag"
"fmt"
"log"
"os"
"github.com/nirmata/kube-policy/controller"
"github.com/nirmata/kube-policy/server"
"github.com/nirmata/kube-policy/controller"
"github.com/nirmata/kube-policy/server"
"k8s.io/sample-controller/pkg/signals"
"k8s.io/sample-controller/pkg/signals"
)
var (
masterURL string
kubeconfig string
cert string
key string
masterURL string
kubeconfig string
cert string
key string
)
func main() {
@ -26,34 +26,33 @@ func main() {
log.Fatal("TLS certificate or/and key is not set")
}
httpLogger := log.New(os.Stdout, "http: ", log.LstdFlags|log.Lshortfile)
crdcLogger := log.New(os.Stdout, "crdc: ", log.LstdFlags|log.Lshortfile)
crdcLogger := log.New(os.Stdout, "Policy Controller: ", log.LstdFlags|log.Lshortfile)
controller, err := controller.NewPolicyController(masterURL, kubeconfig, crdcLogger)
if err != nil {
fmt.Printf("Error creating PolicyController! Error: %s\n", err)
return
}
server := server.NewWebhookServer(cert, key, httpLogger)
httpLogger := log.New(os.Stdout, "HTTPS Server: ", log.LstdFlags|log.Lshortfile)
server := server.NewWebhookServer(cert, key, controller, httpLogger)
server.RunAsync()
controller, err := controller.NewController(masterURL, kubeconfig, crdcLogger)
if err != nil {
fmt.Printf("Error creating Controller! Error: %s\n", err)
return
}
stopCh := signals.SetupSignalHandler()
controller.Run(stopCh)
stopCh := signals.SetupSignalHandler()
controller.Run(stopCh)
if err != nil {
fmt.Printf("Error running PolicyController! Error: %s\n", err)
}
if err != nil {
fmt.Printf("Error running Controller! Error: %s\n", err)
}
fmt.Printf("Policy Controller has started")
<-stopCh
server.Stop()
fmt.Printf("Policy Controller has stopped")
fmt.Printf("Policy PolicyController has started")
<-stopCh
server.Stop()
fmt.Printf("Policy PolicyController has stopped")
}
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&cert, "cert", "", "TLS certificate used in connection with cluster.")
flag.StringVar(&key, "key", "", "Key, used in TLS connection.")
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&cert, "cert", "", "TLS certificate used in connection with cluster.")
flag.StringVar(&key, "key", "", "Key, used in TLS connection.")
}

View file

@ -11,16 +11,18 @@ import (
"os"
"time"
controller "github.com/nirmata/kube-policy/controller"
webhooks "github.com/nirmata/kube-policy/webhooks"
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coreTypes "k8s.io/kubernetes/pkg/apis/core"
)
// WebhookServer is a struct that describes
// TLS server with mutation webhook
type WebhookServer struct {
server http.Server
logger *log.Logger
server http.Server
logger *log.Logger
policyController *controller.PolicyController
mutationWebhook *webhooks.MutationWebhook
}
type patchOperations struct {
@ -33,6 +35,48 @@ type patchOperation struct {
Value interface{} `json:"value,omitempty"`
}
// NewWebhookServer creates new instance of WebhookServer and configures it
func NewWebhookServer(certFile string, keyFile string, controller *controller.PolicyController, logger *log.Logger) *WebhookServer {
if logger == nil {
logger = log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)
}
if controller == nil {
logger.Fatal("Controller is not specified for webhook server")
}
var config tls.Config
pair, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
logger.Fatal("Unable to load certificate and key: ", err)
}
config.Certificates = []tls.Certificate{pair}
mw, err := webhooks.NewMutationWebhook(logger)
if err != nil {
logger.Fatal("Unable to create mutation webhook: ", err)
}
ws := &WebhookServer{
logger: logger,
policyController: controller,
mutationWebhook: mw,
}
mux := http.NewServeMux()
mux.HandleFunc("/mutate", ws.serve)
ws.server = http.Server{
Addr: ":443", // Listen on port for HTTPS requests
TLSConfig: &config,
Handler: mux,
ErrorLog: logger,
ReadTimeout: 15 * time.Second,
WriteTimeout: 15 * time.Second,
}
return ws
}
func (ws *WebhookServer) serve(w http.ResponseWriter, r *http.Request) {
if r.URL.Path == "/mutate" {
admissionReview := ws.parseAdmissionReview(r, w)
@ -40,21 +84,27 @@ func (ws *WebhookServer) serve(w http.ResponseWriter, r *http.Request) {
return
}
admissionResponse := ws.mutate(admissionReview)
if admissionResponse != nil {
admissionReview.Response = admissionResponse
if admissionReview.Request != nil {
admissionReview.Response.UID = admissionReview.Request.UID
var admissionResponse *v1beta1.AdmissionResponse
if webhooks.AdmissionIsRequired(admissionReview.Request) {
admissionResponse = ws.mutationWebhook.Mutate(admissionReview.Request, ws.policyController.GetPolicies())
}
if admissionResponse == nil {
admissionResponse = &v1beta1.AdmissionResponse{
Allowed: true,
}
}
admissionReview.Response = admissionResponse
admissionReview.Response.UID = admissionReview.Request.UID
responseJson, err := json.Marshal(admissionReview)
if err != nil {
http.Error(w, fmt.Sprintf("Could not encode response: %v", err), http.StatusInternalServerError)
return
}
ws.logger.Printf("!!! Writing success !!! Response body: %v", string(responseJson))
ws.logger.Printf("Response body\n:%v", string(responseJson))
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if _, err := w.Write(responseJson); err != nil {
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
@ -96,50 +146,6 @@ func (ws *WebhookServer) parseAdmissionReview(request *http.Request, writer http
}
}
func (ws *WebhookServer) mutate(ar *v1beta1.AdmissionReview) *v1beta1.AdmissionResponse {
request := ar.Request
ws.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo)
if admissionRequired(request) {
var configMap coreTypes.ConfigMap
if err := json.Unmarshal(request.Object.Raw, &configMap); err != nil {
ws.logger.Printf("Could not unmarshal raw object: %v", err)
return &v1beta1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}
/*patch := patchOperation{
Path: "/labels",
Op: "add",
Value: map[string]string{
"is-mutated": "true",
},
}*/
patch := `[ {"op":"add","path":"/metadata/labels","value":{"is-mutated":"true"}} ]`
return &v1beta1.AdmissionResponse{
Allowed: true,
Patch: []byte(patch),
PatchType: func() *v1beta1.PatchType {
pt := v1beta1.PatchTypeJSONPatch
return &pt
}(),
}
} else {
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
}
func admissionRequired(request *v1beta1.AdmissionRequest) bool {
return request.Kind.Kind == "ConfigMap"
}
// RunAsync runs TLS server in separate
// thread and returns control immediately
func (ws *WebhookServer) RunAsync() {
@ -160,35 +166,3 @@ func (ws *WebhookServer) Stop() {
ws.server.Close()
}
}
// NewWebhookServer creates new instance of WebhookServer and configures it
func NewWebhookServer(certFile string, keyFile string, logger *log.Logger) *WebhookServer {
if logger == nil {
logger = log.New(os.Stdout, "", log.LstdFlags|log.Lshortfile)
}
var config tls.Config
pair, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
logger.Fatal("Unable to load certificate and key: ", err)
}
config.Certificates = []tls.Certificate{pair}
mux := http.NewServeMux()
ws := &WebhookServer{
server: http.Server{
Addr: ":443", // Listen on port for HTTPS requests
TLSConfig: &config,
Handler: mux,
ErrorLog: logger,
ReadTimeout: 5 * time.Second,
WriteTimeout: 5 * time.Second,
},
logger: logger,
}
mux.HandleFunc("/mutate", ws.serve)
return ws
}

38
webhooks/admission.go Normal file
View file

@ -0,0 +1,38 @@
package webhooks
import "k8s.io/api/admission/v1beta1"
var supportedKinds = [...]string{
"ConfigMap",
"CronJob",
"DaemonSet",
"Deployment",
"Endpoint",
"HorizontalPodAutoscaler",
"Ingress",
"Job",
"LimitRange",
"Namespace",
"NetworkPolicy",
"PersistentVolumeClaim",
"PodDisruptionBudget",
"PodTemplate",
"ResourceQuota",
"Secret",
"Service",
"StatefulSet",
}
func kindIsSupported(kind string) bool {
for _, k := range supportedKinds {
if k == kind {
return true
}
}
return false
}
func AdmissionIsRequired(request *v1beta1.AdmissionRequest) bool {
// Here you can make additional hardcoded checks
return kindIsSupported(request.Kind.Kind)
}

63
webhooks/mutation.go Normal file
View file

@ -0,0 +1,63 @@
package webhooks
import (
"encoding/json"
"errors"
"log"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
coreTypes "k8s.io/kubernetes/pkg/apis/core"
)
type MutationWebhook struct {
logger *log.Logger
}
func NewMutationWebhook(logger *log.Logger) (*MutationWebhook, error) {
if logger == nil {
return nil, errors.New("Logger must be set for the mutation webhook")
}
return &MutationWebhook{logger: logger}, nil
}
func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest, policies []types.Policy) *v1beta1.AdmissionResponse {
mw.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo)
if len(policies) == 0 {
return nil
}
var configMap coreTypes.ConfigMap
if err := json.Unmarshal(request.Object.Raw, &configMap); err != nil {
mw.logger.Printf("Could not unmarshal raw object: %v", err)
return errorToResponse(err)
}
/*patch := patchOperation{
Path: "/labels",
Op: "add",
Value: map[string]string{
"is-mutated": "true",
},
}*/
patch := `[ {"op":"add","path":"/metadata/labels","value":{"is-mutated":"true"}} ]`
return &v1beta1.AdmissionResponse{
Allowed: true,
Patch: []byte(patch),
PatchType: func() *v1beta1.PatchType {
pt := v1beta1.PatchTypeJSONPatch
return &pt
}(),
}
}
func errorToResponse(err error) *v1beta1.AdmissionResponse {
return &v1beta1.AdmissionResponse{
Result: &metav1.Status{
Message: err.Error(),
},
}
}