1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00

Merge pull request #209 from nirmata/202_refactor_generate

202 refactor generate
This commit is contained in:
shuting 2019-07-08 10:46:38 -07:00 committed by GitHub
commit 23a6c40328
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 644 additions and 69 deletions

View file

@ -8,7 +8,9 @@ import (
controller "github.com/nirmata/kyverno/pkg/controller"
client "github.com/nirmata/kyverno/pkg/dclient"
event "github.com/nirmata/kyverno/pkg/event"
gencontroller "github.com/nirmata/kyverno/pkg/gencontroller"
"github.com/nirmata/kyverno/pkg/sharedinformer"
"github.com/nirmata/kyverno/pkg/utils"
"github.com/nirmata/kyverno/pkg/violation"
"github.com/nirmata/kyverno/pkg/webhooks"
"k8s.io/sample-controller/pkg/signals"
@ -38,6 +40,7 @@ func main() {
if err != nil {
glog.Fatalf("Error creating policy sharedinformer: %v\n", err)
}
kubeInformer := utils.NewKubeInformerFactory(clientConfig)
eventController := event.NewEventController(client, policyInformerFactory)
violationBuilder := violation.NewPolicyViolationBuilder(client, policyInformerFactory, eventController)
@ -47,6 +50,7 @@ func main() {
violationBuilder,
eventController)
genControler := gencontroller.NewGenController(client, eventController, policyInformerFactory, violationBuilder, kubeInformer.Core().V1().Namespaces())
tlsPair, err := initTLSPemPair(clientConfig, client)
if err != nil {
glog.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err)
@ -64,8 +68,9 @@ func main() {
stopCh := signals.SetupSignalHandler()
policyInformerFactory.Run(stopCh)
kubeInformer.Start(stopCh)
eventController.Run(stopCh)
genControler.Run(stopCh)
if err = policyController.Run(stopCh); err != nil {
glog.Fatalf("Error running PolicyController: %v\n", err)
}
@ -77,6 +82,7 @@ func main() {
server.RunAsync()
<-stopCh
server.Stop()
genControler.Stop()
eventController.Stop()
policyController.Stop()
}

View file

@ -18,13 +18,14 @@ import (
// ProcessExisting checks for mutation and validation violations of existing resources
func ProcessExisting(client *client.Client, policy *types.Policy) []*info.PolicyInfo {
glog.Infof("Applying policy %s on existing resources", policy.Name)
// policyInfo := info.NewPolicyInfo(policy.Name,
// rname,
// rns)
resources := []*resourceInfo{}
for _, rule := range policy.Spec.Rules {
for _, k := range rule.Kinds {
if k == "Namespace" {
// REWORK: will be handeled by namespace controller
continue
}
// kind -> resource
gvr := client.DiscoveryClient.GetGVRFromKind(k)
// label selectors
@ -91,9 +92,7 @@ func applyPolicy(client *client.Client, policy *types.Policy, res *resourceInfo)
if err != nil {
return nil, err
}
// Generate
gruleInfos := Generate(client, *policy, rawResource, *res.gvk, false)
policyInfo.AddRuleInfos(gruleInfos)
// Generate rule managed by generation controller
return policyInfo, nil
}

View file

@ -0,0 +1,133 @@
package engine
import (
"encoding/json"
"errors"
"fmt"
"github.com/golang/glog"
v1alpha1 "github.com/nirmata/kyverno/pkg/apis/policy/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/info"
"github.com/nirmata/kyverno/pkg/utils"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)
//GenerateNew apply generation rules on a resource
func GenerateNew(client *client.Client, policy *v1alpha1.Policy, ns *corev1.Namespace) []*info.RuleInfo {
ris := []*info.RuleInfo{}
for _, rule := range policy.Spec.Rules {
if rule.Generation == nil {
continue
}
ri := info.NewRuleInfo(rule.Name, info.Generation)
err := applyRuleGeneratorNew(client, ns, rule.Generation)
if err != nil {
ri.Fail()
ri.Addf("Rule %s: Failed to apply rule generator, err %v.", rule.Name, err)
} else {
ri.Addf("Rule %s: Generation succesfully.", rule.Name)
}
ris = append(ris, ri)
}
return ris
}
func applyRuleGeneratorNew(client *client.Client, ns *corev1.Namespace, gen *v1alpha1.Generation) error {
var err error
resource := &unstructured.Unstructured{}
var rdata map[string]interface{}
// get resource from kind
rGVR := client.DiscoveryClient.GetGVRFromKind(gen.Kind)
if rGVR.Resource == "" {
return fmt.Errorf("Kind to Resource Name conversion failed for %s", gen.Kind)
}
if gen.Data != nil {
// 1> Check if resource exists
obj, err := client.GetResource(rGVR.Resource, ns.Name, gen.Name)
if err == nil {
// 2> If already exsists, then verify the content is contained
// found the resource
// check if the rule is create, if yes, then verify if the specified configuration is present in the resource
ok, err := checkResource(gen.Data, obj)
if err != nil {
return err
}
if !ok {
return errors.New("rule configuration not present in resource")
}
return nil
}
rdata, err = runtime.DefaultUnstructuredConverter.ToUnstructured(&gen.Data)
if err != nil {
glog.Error(err)
return err
}
}
if gen.Clone != nil {
// 1> Check if resource exists
_, err := client.GetResource(rGVR.Resource, ns.Name, gen.Name)
if err == nil {
return nil
}
// 2> If already exists return
resource, err = client.GetResource(rGVR.Resource, gen.Clone.Namespace, gen.Clone.Name)
if err != nil {
return err
}
rdata = resource.UnstructuredContent()
}
resource.SetUnstructuredContent(rdata)
resource.SetName(gen.Name)
resource.SetNamespace(ns.Name)
// Reset resource version
resource.SetResourceVersion("")
_, err = client.CreateResource(rGVR.Resource, ns.Name, resource, false)
if err != nil {
return err
}
return nil
}
func checkResource(config interface{}, resource *unstructured.Unstructured) (bool, error) {
var err error
objByte, err := resource.MarshalJSON()
if err != nil {
// unable to parse the json
return false, err
}
err = resource.UnmarshalJSON(objByte)
if err != nil {
// unable to parse the json
return false, err
}
// marshall and unmarshall json to verify if its right format
configByte, err := json.Marshal(config)
if err != nil {
// unable to marshall the config
return false, err
}
var configData interface{}
err = json.Unmarshal(configByte, &configData)
if err != nil {
// unable to unmarshall
return false, err
}
var objData interface{}
err = json.Unmarshal(objByte, &objData)
if err != nil {
// unable to unmarshall
return false, err
}
// Check if the config is a subset of resource
return utils.JSONsubsetValue(configData, objData), nil
}

View file

@ -0,0 +1,157 @@
package gencontroller
import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/golang/glog"
policyLister "github.com/nirmata/kyverno/pkg/client/listers/policy/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/event"
policySharedInformer "github.com/nirmata/kyverno/pkg/sharedinformer"
"github.com/nirmata/kyverno/pkg/violation"
"k8s.io/apimachinery/pkg/api/errors"
v1Informer "k8s.io/client-go/informers/core/v1"
v1CoreLister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
//Controller watches the 'Namespace' resource creation/update and applied the generation rules on them
type Controller struct {
client *client.Client
namespaceLister v1CoreLister.NamespaceLister
namespaceSynced cache.InformerSynced
policyLister policyLister.PolicyLister
workqueue workqueue.RateLimitingInterface
}
//NewGenController returns a new Controller to manage generation rules
func NewGenController(client *client.Client,
eventController event.Generator,
policyInformer policySharedInformer.PolicyInformer,
violationBuilder violation.Generator,
namespaceInformer v1Informer.NamespaceInformer) *Controller {
// create the controller
controller := &Controller{
client: client,
namespaceLister: namespaceInformer.Lister(),
namespaceSynced: namespaceInformer.Informer().HasSynced,
policyLister: policyInformer.GetLister(),
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), wqNamespace),
}
namespaceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.createNamespaceHandler,
UpdateFunc: controller.updateNamespaceHandler,
})
return controller
}
func (c *Controller) createNamespaceHandler(resource interface{}) {
c.enqueueNamespace(resource)
}
func (c *Controller) updateNamespaceHandler(oldResoruce, newResource interface{}) {
// DO we need to anything if the namespace is modified ?
}
func (c *Controller) enqueueNamespace(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
glog.Error(err)
return
}
c.workqueue.Add(key)
}
//Run to run the controller
func (c *Controller) Run(stopCh <-chan struct{}) error {
if ok := cache.WaitForCacheSync(stopCh, c.namespaceSynced); !ok {
return fmt.Errorf("faield to wait for caches to sync")
}
for i := 0; i < workerCount; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
glog.Info("started namespace controller workers")
return nil
}
//Stop to stop the controller
func (c *Controller) Stop() {
defer c.workqueue.ShutDown()
glog.Info("shutting down namespace controller workers")
}
func (c *Controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.workqueue.Done(obj)
err := c.syncHandler(obj)
c.handleErr(err, obj)
return nil
}(obj)
if err != nil {
glog.Error(err)
return true
}
return true
}
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.workqueue.Forget(key)
return
}
if c.workqueue.NumRequeues(key) < wqRetryLimit {
glog.Warningf("Error syncing events %v: %v", key, err)
c.workqueue.AddRateLimited(key)
return
}
c.workqueue.Forget(key)
glog.Error(err)
glog.Warningf("Dropping the key %q out of the queue: %v", key, err)
}
func (c *Controller) syncHandler(obj interface{}) error {
var key string
var ok bool
if key, ok = obj.(string); !ok {
return fmt.Errorf("expected string in workqueue but got %v", obj)
}
// Namespace is cluster wide resource
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Errorf("invalid namespace key: %s", key)
return err
}
// Get Namespace
ns, err := c.namespaceLister.Get(name)
if err != nil {
if errors.IsNotFound(err) {
glog.Errorf("namespace '%s' in work queue no longer exists", key)
return nil
}
}
glog.Info("apply generation policy to resources :)")
//TODO: need to find a way to store the policy such that we can directly queury the
// policies with generation policies
// PolicyListerExpansion
c.processNamespace(ns)
return nil
}

View file

@ -0,0 +1,68 @@
package gencontroller
import (
"github.com/golang/glog"
v1alpha1 "github.com/nirmata/kyverno/pkg/apis/policy/v1alpha1"
"github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/info"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
func (c *Controller) processNamespace(ns *corev1.Namespace) error {
//Get all policies and then verify if the namespace matches any of the defined selectors
policies, err := c.listPolicies(ns)
if err != nil {
return err
}
// process policy on namespace
for _, p := range policies {
c.processPolicy(ns, p)
}
return nil
}
func (c *Controller) listPolicies(ns *corev1.Namespace) ([]*v1alpha1.Policy, error) {
var fpolicies []*v1alpha1.Policy
policies, err := c.policyLister.List(labels.NewSelector())
if err != nil {
glog.Error("Unable to connect to policy controller. Unable to access policies not applying GENERATION rules")
return nil, err
}
for _, p := range policies {
// Check if the policy contains a generatoin rule
for _, r := range p.Spec.Rules {
if r.Generation != nil {
// Check if the resource meets the description
if namespaceMeetsRuleDescription(ns, r.ResourceDescription) {
fpolicies = append(fpolicies, p)
break
}
}
}
}
return fpolicies, nil
}
func (c *Controller) processPolicy(ns *corev1.Namespace, p *v1alpha1.Policy) error {
policyInfo := info.NewPolicyInfo(p.Name,
ns.Kind,
ns.Name,
"") // Namespace has no namespace..WOW
ruleInfos := engine.GenerateNew(c.client, p, ns)
policyInfo.AddRuleInfos(ruleInfos)
if !policyInfo.IsSuccessful() {
glog.Infof("Failed to apply policy %s on resource %s %s", p.Name, ns.Kind, ns.Name)
for _, r := range ruleInfos {
glog.Warning(r.Msgs)
}
} else {
glog.Infof("Generation from policy %s has succesfully applied to %s %s", p.Name, ns.Kind, ns.Name)
}
//TODO Generate policy Violations and corresponding events based on policyInfo
return nil
}

View file

@ -0,0 +1,62 @@
package gencontroller
import (
"github.com/minio/minio/pkg/wildcard"
v1alpha1 "github.com/nirmata/kyverno/pkg/apis/policy/v1alpha1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
)
const (
wqNamespace string = "namespace"
workerCount int = 1
wqRetryLimit int = 5
)
func namespaceMeetsRuleDescription(ns *corev1.Namespace, resourceDescription v1alpha1.ResourceDescription) bool {
//REWORK Not needed but verify the 'Namespace' is defined in the list of supported kinds
if !findKind(resourceDescription.Kinds, "Namespace") {
return false
}
if resourceDescription.Name != nil {
if !wildcard.Match(*resourceDescription.Name, ns.Name) {
return false
}
}
if resourceDescription.Selector != nil {
selector, err := metav1.LabelSelectorAsSelector(resourceDescription.Selector)
if err != nil {
return false
}
labelSet := convertLabelsToLabelSet(ns.Labels)
// labels
if !selector.Matches(labelSet) {
return false
}
}
return true
}
func convertLabelsToLabelSet(labelMap map[string]string) labels.Set {
labelSet := make(labels.Set, len(labelMap))
// REWORK: check if the below works
// if x, ok := labelMap.(labels.Set); !ok {
// }
for k, v := range labelMap {
labelSet[k] = v
}
return labelSet
}
func findKind(kinds []string, kindGVK string) bool {
for _, kind := range kinds {
if kind == kindGVK {
return true
}
}
return false
}

20
pkg/utils/informer.go Normal file
View file

@ -0,0 +1,20 @@
package utils
import (
"github.com/golang/glog"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
//NewKubeInformerFactory returns a kubeinformer
func NewKubeInformerFactory(cfg *rest.Config) kubeinformers.SharedInformerFactory {
// kubernetes client
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Errorf("error building kubernetes client: %s", err)
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0)
return kubeInformerFactory
}

189
pkg/utils/json.go Normal file
View file

@ -0,0 +1,189 @@
package utils
import (
"github.com/golang/glog"
)
//JSONsubsetValue checks if JSON a is contained in JSON b
func JSONsubsetValue(a interface{}, b interface{}) bool {
switch typed := a.(type) {
case bool:
bv, ok := b.(bool)
if !ok {
glog.Errorf("expected bool found %T", b)
return false
}
av, _ := a.(bool)
if av == bv {
return true
}
case int:
bv, ok := b.(int)
if !ok {
glog.Errorf("expected int found %T", b)
return false
}
av, _ := a.(int)
if av == bv {
return true
}
case float64:
bv, ok := b.(float64)
if !ok {
glog.Errorf("expected float64 found %T", b)
return false
}
av, _ := a.(float64)
if av == bv {
return true
}
case string:
bv, ok := b.(string)
if !ok {
glog.Errorf("expected string found %T", b)
return false
}
av, _ := a.(string)
if av == bv {
return true
}
case map[string]interface{}:
bv, ok := b.(map[string]interface{})
if !ok {
glog.Errorf("expected map[string]interface{} found %T", b)
return false
}
av, _ := a.(map[string]interface{})
return subsetMap(av, bv)
case []interface{}:
// TODO: verify the logic
bv, ok := b.([]interface{})
if !ok {
glog.Errorf("expected []interface{} found %T", b)
return false
}
av, _ := a.([]interface{})
return subsetSlice(av, bv)
default:
glog.Errorf("Unspported type %s", typed)
}
return false
}
func subsetMap(a, b map[string]interface{}) bool {
// check if keys are present
for k := range a {
if _, ok := b[k]; !ok {
glog.Errorf("key %s, not present in resource", k)
return false
}
}
// check if values for the keys match
for ak, av := range a {
bv := b[ak]
if !JSONsubsetValue(av, bv) {
return false
}
}
return true
}
func contains(a interface{}, b []interface{}) bool {
switch typed := a.(type) {
case bool:
for _, bv := range b {
bv, ok := bv.(bool)
if !ok {
return false
}
av, _ := a.(bool)
if bv == av {
return true
}
}
case int:
for _, bv := range b {
bv, ok := bv.(int)
if !ok {
return false
}
av, _ := a.(int)
if bv == av {
return true
}
}
case float64:
for _, bv := range b {
bv, ok := bv.(float64)
if !ok {
return false
}
av, _ := a.(float64)
if bv == av {
return true
}
}
case string:
for _, bv := range b {
bv, ok := bv.(string)
if !ok {
return false
}
av, _ := a.(string)
if bv == av {
return true
}
}
case map[string]interface{}:
for _, bv := range b {
bv, ok := bv.(map[string]interface{})
if !ok {
return false
}
av, _ := a.(map[string]interface{})
if subsetMap(av, bv) {
return true
}
}
case []interface{}:
for _, bv := range b {
bv, ok := bv.([]interface{})
if !ok {
return false
}
av, _ := a.([]interface{})
if JSONsubsetValue(av, bv) {
return true
}
}
default:
glog.Errorf("Unspported type %s", typed)
}
return false
}
func subsetSlice(a, b []interface{}) bool {
// if empty
if len(a) == 0 {
return true
}
// check if len is not greater
if len(a) > len(b) {
return false
}
for _, av := range a {
if !contains(av, b) {
return false
}
}
return true
}

View file

@ -291,70 +291,11 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1
},
}
}
// Process Generation
return ws.HandleGeneration(request)
}
//HandleGeneration handles application of generation rules
func (ws *WebhookServer) HandleGeneration(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
if request.Kind.Kind != "Namespace" {
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
policyInfos := []*info.PolicyInfo{}
policies, err := ws.policyLister.List(labels.NewSelector())
if err != nil {
// Unable to connect to policy Lister to access policies
glog.Error("Unable to connect to policy controller to access policies. Generation Rules are NOT being applied")
glog.Warning(err)
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
for _, policy := range policies {
if !StringInSlice(request.Kind.Kind, getApplicableKindsForPolicy(policy)) {
continue
}
rname := engine.ParseNameFromObject(request.Object.Raw)
rns := engine.ParseNamespaceFromObject(request.Object.Raw)
rkind := engine.ParseKindFromObject(request.Object.Raw)
policyInfo := info.NewPolicyInfo(policy.Name,
rkind,
rname,
rns)
glog.V(3).Infof("Handling generation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
request.Kind.Kind, rns, rname, request.UID, request.Operation)
glog.Infof("Applying policy %s with generation %d rules", policy.ObjectMeta.Name, len(policy.Spec.Rules))
ruleInfos := engine.Generate(ws.client, *policy, request.Object.Raw, request.Kind, false)
policyInfo.AddRuleInfos(ruleInfos)
if !policyInfo.IsSuccessful() {
glog.Infof("Failed to apply policy %s on resource %s/%s", policy.Name, rname, rns)
for _, r := range ruleInfos {
glog.Warning(r.Msgs)
}
} else {
glog.Infof("Generation from policy %s has succesfully applied to %s %s/%s", policy.Name, request.Kind.Kind, rns, rname)
}
policyInfos = append(policyInfos, policyInfo)
}
ok, msg := isAdmSuccesful(policyInfos)
if ok {
glog.V(3).Info("Generation is successful")
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
glog.V(3).Info("Validation is successful")
return &v1beta1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
Message: msg,
},
Allowed: true,
}
// Generation rules applied via generation controller
}
// bodyToAdmissionReview creates AdmissionReview object from request body