1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 10:28:36 +00:00
This commit is contained in:
shivkumar dudhani 2019-08-19 17:17:52 -07:00
parent 3abd422de4
commit 606c519789
5 changed files with 3 additions and 746 deletions

View file

@ -1,281 +0,0 @@
package controller
import (
"fmt"
"reflect"
"time"
"github.com/nirmata/kyverno/pkg/annotations"
"github.com/nirmata/kyverno/pkg/info"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/nirmata/kyverno/pkg/engine"
"github.com/golang/glog"
v1alpha1 "github.com/nirmata/kyverno/pkg/apis/policy/v1alpha1"
lister "github.com/nirmata/kyverno/pkg/client/listers/policy/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/sharedinformer"
violation "github.com/nirmata/kyverno/pkg/violation"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
//PolicyController to manage Policy CRD
type PolicyController struct {
client *client.Client
policyLister lister.PolicyLister
policySynced cache.InformerSynced
violationBuilder violation.Generator
eventController event.Generator
queue workqueue.RateLimitingInterface
filterK8Resources []utils.K8Resource
}
// NewPolicyController from cmd args
func NewPolicyController(client *client.Client,
policyInformer sharedinformer.PolicyInformer,
violationBuilder violation.Generator,
eventController event.Generator,
filterK8Resources string) *PolicyController {
controller := &PolicyController{
client: client,
policyLister: policyInformer.GetLister(),
policySynced: policyInformer.GetInfomer().HasSynced,
violationBuilder: violationBuilder,
eventController: eventController,
filterK8Resources: utils.ParseKinds(filterK8Resources),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName),
}
policyInformer.GetInfomer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.createPolicyHandler,
UpdateFunc: controller.updatePolicyHandler,
DeleteFunc: controller.deletePolicyHandler,
})
return controller
}
func (pc *PolicyController) createPolicyHandler(resource interface{}) {
pc.enqueuePolicy(resource)
}
func (pc *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) {
newPolicy := newResource.(*v1alpha1.Policy)
oldPolicy := oldResource.(*v1alpha1.Policy)
newPolicy.Status = v1alpha1.Status{}
oldPolicy.Status = v1alpha1.Status{}
newPolicy.ResourceVersion = ""
oldPolicy.ResourceVersion = ""
if reflect.DeepEqual(newPolicy, oldPolicy) {
return
}
pc.enqueuePolicy(newResource)
}
func (pc *PolicyController) deletePolicyHandler(resource interface{}) {
var object metav1.Object
var ok bool
if object, ok = resource.(metav1.Object); !ok {
glog.Error("error decoding object, invalid type")
return
}
cleanAnnotations(pc.client, resource, pc.filterK8Resources)
glog.Infof("policy deleted: %s", object.GetName())
}
func (pc *PolicyController) enqueuePolicy(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
glog.Error(err)
return
}
pc.queue.Add(key)
}
// Run is main controller thread
func (pc *PolicyController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
if ok := cache.WaitForCacheSync(stopCh, pc.policySynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < policyControllerWorkerCount; i++ {
go wait.Until(pc.runWorker, time.Second, stopCh)
}
glog.Info("started policy controller workers")
return nil
}
//Stop to perform actions when controller is stopped
func (pc *PolicyController) Stop() {
pc.queue.ShutDown()
glog.Info("shutting down policy controller workers")
}
func (pc *PolicyController) runWorker() {
for pc.processNextWorkItem() {
}
}
func (pc *PolicyController) processNextWorkItem() bool {
obj, shutdown := pc.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer pc.queue.Done(obj)
err := pc.syncHandler(obj)
pc.handleErr(err, obj)
return nil
}(obj)
if err != nil {
glog.Error(err)
return true
}
return true
}
func (pc *PolicyController) handleErr(err error, key interface{}) {
if err == nil {
pc.queue.Forget(key)
return
}
// This controller retries if something goes wrong. After that, it stops trying.
if pc.queue.NumRequeues(key) < policyWorkQueueRetryLimit {
glog.Warningf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
pc.queue.AddRateLimited(key)
return
}
pc.queue.Forget(key)
glog.Error(err)
glog.Warningf("Dropping the key out of the queue: %v", err)
}
func (pc *PolicyController) syncHandler(obj interface{}) error {
var key string
var ok bool
if key, ok = obj.(string); !ok {
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("invalid policy key: %s", key)
return nil
}
// Get Policy
policy, err := pc.policyLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.Errorf("policy '%s' in work queue no longer exists", key)
return nil
}
return err
}
glog.Infof("process policy %s on existing resources", policy.GetName())
// Process policy on existing resources
policyInfos := engine.ProcessExisting(pc.client, policy, pc.filterK8Resources)
events, violations := pc.createEventsAndViolations(policyInfos)
// Events, Violations
pc.eventController.Add(events...)
err = pc.violationBuilder.Add(violations...)
if err != nil {
glog.Error(err)
}
// Annotations
pc.createAnnotations(policyInfos)
return nil
}
func (pc *PolicyController) createAnnotations(policyInfos []*info.PolicyInfo) {
for _, pi := range policyInfos {
//get resource
obj, err := pc.client.GetResource(pi.RKind, pi.RNamespace, pi.RName)
if err != nil {
glog.Error(err)
continue
}
// add annotation for policy application
ann := obj.GetAnnotations()
// if annotations are nil then create a map and patch
// else
// add the exact patch
patch, err := annotations.PatchAnnotations(ann, pi, info.All)
if patch == nil {
/// nothing to patch
return
}
_, err = pc.client.PatchResource(pi.RKind, pi.RNamespace, pi.RName, patch)
if err != nil {
glog.Error(err)
continue
}
}
}
func (pc *PolicyController) createEventsAndViolations(policyInfos []*info.PolicyInfo) ([]*event.Info, []*violation.Info) {
events := []*event.Info{}
violations := []*violation.Info{}
// Create events from the policyInfo
for _, policyInfo := range policyInfos {
frules := []v1alpha1.FailedRule{}
sruleNames := []string{}
for _, rule := range policyInfo.Rules {
if !rule.IsSuccessful() {
e := &event.Info{}
frule := v1alpha1.FailedRule{Name: rule.Name}
switch rule.RuleType {
case info.Mutation, info.Validation, info.Generation:
// Events
e = event.NewEvent(policyInfo.RKind, policyInfo.RNamespace, policyInfo.RName, event.PolicyViolation, event.FProcessRule, rule.Name, policyInfo.Name)
switch rule.RuleType {
case info.Mutation:
frule.Type = info.Mutation.String()
case info.Validation:
frule.Type = info.Validation.String()
case info.Generation:
frule.Type = info.Generation.String()
}
frule.Error = rule.GetErrorString()
default:
glog.Info("Unsupported Rule type")
}
frule.Error = rule.GetErrorString()
frules = append(frules, frule)
events = append(events, e)
} else {
sruleNames = append(sruleNames, rule.Name)
}
}
if !policyInfo.IsSuccessful() {
e := event.NewEvent("Policy", "", policyInfo.Name, event.PolicyViolation, event.FResourcePolcy, policyInfo.RNamespace+"/"+policyInfo.RName, concatFailedRules(frules))
events = append(events, e)
// Violation
v := violation.BuldNewViolation(policyInfo.Name, policyInfo.RKind, policyInfo.RNamespace, policyInfo.RName, event.PolicyViolation.String(), policyInfo.GetFailedRules())
violations = append(violations, v)
} else {
// clean up violations
pc.violationBuilder.RemoveInactiveViolation(policyInfo.Name, policyInfo.RKind, policyInfo.RNamespace, policyInfo.RName, info.Mutation)
pc.violationBuilder.RemoveInactiveViolation(policyInfo.Name, policyInfo.RKind, policyInfo.RNamespace, policyInfo.RName, info.Validation)
}
}
return events, violations
}

View file

@ -1,147 +0,0 @@
package controller
import (
"testing"
"github.com/golang/glog"
types "github.com/nirmata/kyverno/pkg/apis/policy/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
event "github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/sharedinformer"
violation "github.com/nirmata/kyverno/pkg/violation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/sample-controller/pkg/signals"
)
func TestCreatePolicy(t *testing.T) {
f := newFixture(t)
// new policy is added to policy lister and explictly passed to sync-handler
// to process the existing
policy := newPolicy("test-policy")
f.policyLister = append(f.policyLister, policy)
f.objects = append(f.objects, policy)
// run controller
f.runControler("test-policy")
}
func (f *fixture) runControler(policyName string) {
policyInformerFactory, err := sharedinformer.NewFakeSharedInformerFactory()
if err != nil {
f.t.Fatal(err)
}
eventController := event.NewEventController(f.Client, policyInformerFactory)
violationBuilder := violation.NewPolicyViolationBuilder(f.Client, policyInformerFactory, eventController)
// new controller
policyController := NewPolicyController(
f.Client,
policyInformerFactory,
violationBuilder,
eventController,
"")
stopCh := signals.SetupSignalHandler()
// start informer & controller
policyInformerFactory.Run(stopCh)
if err = policyController.Run(stopCh); err != nil {
glog.Fatalf("Error running PolicyController: %v\n", err)
}
// add policy to the informer
for _, p := range f.policyLister {
policyInformerFactory.GetInfomer().GetIndexer().Add(p)
}
// sync handler
// reads the policy from the policy lister and processes them
err = policyController.syncHandler(policyName)
if err != nil {
f.t.Fatal(err)
}
policyController.Stop()
}
type fixture struct {
t *testing.T
Client *client.Client
policyLister []*types.Policy
objects []runtime.Object
}
func newFixture(t *testing.T) *fixture {
// init groupversion
regResource := []schema.GroupVersionResource{
schema.GroupVersionResource{Group: "group", Version: "version", Resource: "thekinds"},
schema.GroupVersionResource{Group: "group2", Version: "version", Resource: "thekinds"},
schema.GroupVersionResource{Group: "", Version: "v1", Resource: "namespaces"},
schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "deployments"},
}
objects := []runtime.Object{newUnstructured("group/version", "TheKind", "ns-foo", "name-foo"),
newUnstructured("group2/version", "TheKind", "ns-foo", "name2-foo"),
newUnstructured("group/version", "TheKind", "ns-foo", "name-bar"),
newUnstructured("group/version", "TheKind", "ns-foo", "name-baz"),
newUnstructured("group2/version", "TheKind", "ns-foo", "name2-baz"),
newUnstructured("apps/v1", "Deployment", "kyverno", "kyverno"),
}
scheme := runtime.NewScheme()
// Create mock client
fclient, err := client.NewMockClient(scheme, objects...)
if err != nil {
t.Fatal(err)
}
// set discovery Client
fclient.SetDiscovery(client.NewFakeDiscoveryClient(regResource))
f := &fixture{
t: t,
Client: fclient,
}
return f
}
// create mock client with initial resouces
// set registered resources for gvr
func (f *fixture) setupFixture() {
scheme := runtime.NewScheme()
fclient, err := client.NewMockClient(scheme, f.objects...)
if err != nil {
f.t.Fatal(err)
}
regresource := []schema.GroupVersionResource{
schema.GroupVersionResource{Group: "kyverno.io",
Version: "v1alpha1",
Resource: "policys"}}
fclient.SetDiscovery(client.NewFakeDiscoveryClient(regresource))
}
func newPolicy(name string) *types.Policy {
return &types.Policy{
TypeMeta: metav1.TypeMeta{APIVersion: types.SchemeGroupVersion.String()},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
}
func newUnstructured(apiVersion, kind, namespace, name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": apiVersion,
"kind": kind,
"metadata": map[string]interface{}{
"namespace": namespace,
"name": name,
},
},
}
}

View file

@ -1,160 +0,0 @@
package gencontroller
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/golang/glog"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/event"
"k8s.io/apimachinery/pkg/api/errors"
v1Informer "k8s.io/client-go/informers/core/v1"
v1CoreLister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
//Controller watches the 'Namespace' resource creation/update and applied the generation rules on them
type Controller struct {
client *client.Client
namespaceLister v1CoreLister.NamespaceLister
namespaceSynced cache.InformerSynced
policyLister policyLister.PolicyLister
eventController event.Generator
violationBuilder violation.Generator
annotationsController annotations.Controller
workqueue workqueue.RateLimitingInterface
}
//NewGenController returns a new Controller to manage generation rules
func NewGenController(client *client.Client,
eventController event.Generator,
policyInformer policySharedInformer.PolicyInformer,
violationBuilder violation.Generator,
namespaceInformer v1Informer.NamespaceInformer,
annotationsController annotations.Controller) *Controller {
// create the controller
controller := &Controller{
client: client,
namespaceLister: namespaceInformer.Lister(),
namespaceSynced: namespaceInformer.Informer().HasSynced,
policyLister: policyInformer.GetLister(),
eventController: eventController,
violationBuilder: violationBuilder,
annotationsController: annotationsController,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), wqNamespace),
}
namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.createNamespaceHandler,
UpdateFunc: controller.updateNamespaceHandler,
})
return controller
}
func (c *Controller) createNamespaceHandler(resource interface{}) {
c.enqueueNamespace(resource)
}
func (c *Controller) updateNamespaceHandler(oldResoruce, newResource interface{}) {
// DO we need to anything if the namespace is modified ?
}
func (c *Controller) enqueueNamespace(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
glog.Error(err)
return
}
c.workqueue.Add(key)
}
//Run to run the controller
func (c *Controller) Run(stopCh <-chan struct{}) error {
if ok := cache.WaitForCacheSync(stopCh, c.namespaceSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
for i := 0; i < workerCount; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
glog.Info("started namespace controller workers")
return nil
}
//Stop to stop the controller
func (c *Controller) Stop() {
c.workqueue.ShutDown()
glog.Info("shutting down namespace controller workers")
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
err := c.syncHandler(obj)
c.handleErr(err, obj)
return nil
}(obj)
if err != nil {
glog.Error(err)
return true
}
return true
}
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.workqueue.Forget(key)
return
}
if c.workqueue.NumRequeues(key) < wqRetryLimit {
glog.Warningf("Error syncing events %v: %v", key, err)
c.workqueue.AddRateLimited(key)
return
}
c.workqueue.Forget(key)
glog.Error(err)
glog.Warningf("Dropping the key %q out of the queue: %v", key, err)
}
func (c *Controller) syncHandler(obj interface{}) error {
var key string
var ok bool
if key, ok = obj.(string); !ok {
return fmt.Errorf("expected string in workqueue but got %v", obj)
}
// Namespace is cluster wide resource
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("invalid namespace key: %s", key)
return err
}
// Get Namespace
ns, err := c.namespaceLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.Errorf("namespace '%s' in work queue no longer exists", key)
return nil
}
}
//TODO: need to find a way to store the policy such that we can directly queury the
// policies with generation policies
// PolicyListerExpansion
c.processNamespace(ns)
return nil
}

View file

@ -1,155 +0,0 @@
package gencontroller
import (
"encoding/json"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/golang/glog"
"github.com/nirmata/kyverno/pkg/engine"
event "github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/info"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
)
func (c *Controller) processNamespace(ns *corev1.Namespace) error {
//Get all policies and then verify if the namespace matches any of the defined selectors
policies, err := c.listPolicies(ns)
if err != nil {
return err
}
// process policy on namespace
for _, p := range policies {
c.processPolicy(ns, p)
}
return nil
}
func (c *Controller) listPolicies(ns *corev1.Namespace) ([]*v1alpha1.Policy, error) {
var fpolicies []*v1alpha1.Policy
policies, err := c.policyLister.List(labels.NewSelector())
if err != nil {
glog.Error("Unable to connect to policy controller. Unable to access policies not applying GENERATION rules")
return nil, err
}
for _, p := range policies {
// Check if the policy contains a generatoin rule
for _, r := range p.Spec.Rules {
if r.Generation != nil {
// Check if the resource meets the description
data, err := json.Marshal(ns)
if err != nil {
glog.Error(err)
continue
}
// convert types of GVK
nsGvk := schema.FromAPIVersionAndKind("v1", "Namespace")
// Hardcode as we have a informer on specified gvk
gvk := metav1.GroupVersionKind{Group: nsGvk.Group, Kind: nsGvk.Kind, Version: nsGvk.Version}
if engine.ResourceMeetsDescription(data, r.MatchResources.ResourceDescription, r.ExcludeResources.ResourceDescription, gvk) {
fpolicies = append(fpolicies, p)
break
}
}
}
}
return fpolicies, nil
}
func (c *Controller) processPolicy(ns *corev1.Namespace, p *v1alpha1.Policy) {
var eventInfo *event.Info
var onViolation bool
var msg string
policyInfo := info.NewPolicyInfo(p.Name,
"Namespace",
ns.Name,
"",
p.Spec.ValidationFailureAction) // Namespace has no namespace..WOW
// convert to unstructured
unstrMap, err := runtime.DefaultUnstructuredConverter.ToUnstructured(ns)
if err != nil {
glog.Error(err)
return
}
unstObj := unstructured.Unstructured{Object: unstrMap}
ruleInfos := engine.Generate(c.client, p, unstObj)
policyInfo.AddRuleInfos(ruleInfos)
// generate annotations on namespace
c.createAnnotations(policyInfo)
//TODO generate namespace on created resources
if !policyInfo.IsSuccessful() {
glog.Infof("Failed to apply policy %s on resource %s %s", p.Name, ns.Kind, ns.Name)
for _, r := range ruleInfos {
glog.Warning(r.Msgs)
if msg = strings.Join(r.Msgs, " "); strings.Contains(msg, "rule configuration not present in resource") {
onViolation = true
msg = fmt.Sprintf(`Resource creation violates generate rule '%s' of policy '%s'`, r.Name, policyInfo.Name)
}
}
if onViolation {
glog.Infof("Adding violation for generation rule of policy %s\n", policyInfo.Name)
// Policy Violation
v := violation.BuldNewViolation(policyInfo.Name, policyInfo.RKind, policyInfo.RNamespace, policyInfo.RName, event.PolicyViolation.String(), policyInfo.FailedRules())
c.violationBuilder.Add(v)
} else {
// Event
eventInfo = event.NewEvent(policyKind, "", policyInfo.Name, event.RequestBlocked,
event.FPolicyApplyBlockCreate, policyInfo.RNamespace+"/"+policyInfo.RName, policyInfo.GetRuleNames(false))
glog.V(2).Infof("Request blocked event info has prepared for %s/%s\n", policyKind, policyInfo.Name)
c.eventController.Add(eventInfo)
}
return
}
glog.Infof("Generation from policy %s has succesfully applied to %s/%s", p.Name, policyInfo.RKind, policyInfo.RName)
eventInfo = event.NewEvent(policyInfo.RKind, policyInfo.RNamespace, policyInfo.RName,
event.PolicyApplied, event.SRulesApply, policyInfo.GetRuleNames(true), policyInfo.Name)
glog.V(2).Infof("Success event info has prepared for %s/%s\n", policyInfo.RKind, policyInfo.RName)
c.eventController.Add(eventInfo)
}
func (c *Controller) createAnnotations(pi *info.PolicyInfo) {
//get resource
obj, err := c.client.GetResource(pi.RKind, pi.RNamespace, pi.RName)
if err != nil {
glog.Error(err)
return
}
// add annotation for policy application
ann := obj.GetAnnotations()
// Generation rules
gpatch, err := annotations.PatchAnnotations(ann, pi, info.Generation)
if err != nil {
glog.Error(err)
return
}
if gpatch == nil {
// nothing to patch
return
}
// add the anotation to the resource
_, err = c.client.PatchResource(pi.RKind, pi.RNamespace, pi.RName, gpatch)
if err != nil {
glog.Error(err)
return
}
}

View file

@ -15,7 +15,7 @@ import (
"github.com/golang/glog"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1"
lister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1alpha1"
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1alpha1"
"github.com/nirmata/kyverno/pkg/config"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/event"
@ -31,8 +31,8 @@ type WebhookServer struct {
server http.Server
client *client.Client
kyvernoClient *kyvernoclient.Clientset
pLister lister.PolicyLister
pvLister lister.PolicyViolationLister
pLister kyvernolister.PolicyLister
pvLister kyvernolister.PolicyViolationLister
pListerSynced cache.InformerSynced
pvListerSynced cache.InformerSynced
eventGen event.Interface