1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-15 17:51:20 +00:00

Add policy cache based on policyType (#960)

* add policy cache based on policyType

* fetch policy from cache in webhook

* add unit test for policy cache

* update log for exclude resources filter

* skip webhook mutation on DELETE operation

* remove duplicate k8s version check

* add description
This commit is contained in:
shuting 2020-07-02 12:49:10 -07:00 committed by GitHub
parent 8997dc6627
commit ed52bd3d9f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 452 additions and 32 deletions

View file

@ -10,6 +10,7 @@ import (
"time"
"github.com/nirmata/kyverno/pkg/openapi"
"github.com/nirmata/kyverno/pkg/policycache"
"github.com/nirmata/kyverno/pkg/checker"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
@ -225,6 +226,11 @@ func main() {
log.Log.WithName("GenerateCleanUpController"),
)
pCacheController := policycache.NewPolicyCacheController(
pInformer.Kyverno().V1().ClusterPolicies(),
log.Log.WithName("PolicyCacheController"),
)
// CONFIGURE CERTIFICATES
tlsPair, err := client.InitTLSPemPair(clientConfig, fqdncn)
if err != nil {
@ -251,6 +257,8 @@ func main() {
// Sync openAPI definitions of resources
openAPISync := openapi.NewCRDSync(client, openAPIController)
supportMudateValidate := utils.HigherThanKubernetesVersion(client, log.Log, 1, 14, 0)
// WEBHOOK
// - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration
// - reports the results based on the response from the policy engine:
@ -265,12 +273,14 @@ func main() {
kubeInformer.Rbac().V1().RoleBindings(),
kubeInformer.Rbac().V1().ClusterRoleBindings(),
eventGenerator,
pCacheController.Cache,
webhookRegistrationClient,
statusSync.Listener,
configData,
pvgen,
grgen,
rWebhookWatcher,
supportMudateValidate,
cleanUp,
log.Log.WithName("WebhookServer"),
openAPIController,
@ -294,6 +304,7 @@ func main() {
go grcc.Run(1, stopCh)
go pvgen.Run(1, stopCh)
go statusSync.Run(1, stopCh)
go pCacheController.Run(1, stopCh)
openAPISync.Run(1, stopCh)
// verifys if the admission control is enabled and active

View file

@ -153,7 +153,7 @@ func (cd *ConfigData) load(cm v1.ConfigMap) {
logger.V(4).Info("resourceFilters did not change")
return
}
logger.V(4).Info(" Updated resource filters", "oldFilters", cd.filters, "newFilters", newFilters)
logger.V(2).Info("Updated resource filters", "oldFilters", cd.filters, "newFilters", newFilters)
// update filters
cd.filters = newFilters
}
@ -167,7 +167,7 @@ func (cd *ConfigData) initFilters(filters string) {
defer cd.mux.Unlock()
newFilters := parseKinds(filters)
logger.Info("Init resource filters", "filters", newFilters)
logger.V(2).Info("Init resource filters", "filters", newFilters)
// update filters
cd.filters = newFilters
}

139
pkg/policycache/cache.go Normal file
View file

@ -0,0 +1,139 @@
package policycache
// package main
import (
"sync"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
)
type pMap struct {
sync.RWMutex
dataMap map[PolicyType][]*kyverno.ClusterPolicy
}
// policyCache ...
type policyCache struct {
pMap
logr.Logger
}
// Interface ...
type Interface interface {
Add(policy *kyverno.ClusterPolicy)
Remove(policy *kyverno.ClusterPolicy)
Get(pkey PolicyType) []*kyverno.ClusterPolicy
}
// newPolicyCache ...
func newPolicyCache(log logr.Logger) Interface {
return &policyCache{
pMap{
dataMap: make(map[PolicyType][]*kyverno.ClusterPolicy),
},
log,
}
}
// Add a policy to cache
func (pc *policyCache) Add(policy *kyverno.ClusterPolicy) {
pc.pMap.add(policy)
pc.Logger.V(4).Info("policy is added to cache", "name", policy.GetName())
}
// Get the list of matched policies
func (pc *policyCache) Get(pkey PolicyType) []*kyverno.ClusterPolicy {
return pc.pMap.get(pkey)
}
// Remove a policy from cache
func (pc *policyCache) Remove(policy *kyverno.ClusterPolicy) {
pc.pMap.remove(policy)
pc.Logger.V(4).Info("policy is removed from cache", "name", policy.GetName())
}
// buildCacheMap builds the map to store the names of all existing
// policies in the cache, it is used to aviod adding duplicate policies
func buildCacheMap(policies []*kyverno.ClusterPolicy) map[string]bool {
cacheMap := make(map[string]bool)
for _, p := range policies {
name := p.GetName()
if !cacheMap[name] {
cacheMap[p.GetName()] = true
}
}
return cacheMap
}
func (m *pMap) add(policy *kyverno.ClusterPolicy) {
m.Lock()
defer m.Unlock()
enforcePolicy := policy.Spec.ValidationFailureAction == "enforce"
mutateMap := buildCacheMap(m.dataMap[Mutate])
validateMap := buildCacheMap(m.dataMap[ValidateEnforce])
generateMap := buildCacheMap(m.dataMap[Generate])
pName := policy.GetName()
for _, rule := range policy.Spec.Rules {
if rule.HasMutate() {
if !mutateMap[pName] {
mutateMap[pName] = true
mutatePolicy := m.dataMap[Mutate]
m.dataMap[Mutate] = append(mutatePolicy, policy)
}
continue
}
if rule.HasValidate() && enforcePolicy {
if !validateMap[pName] {
validateMap[pName] = true
validatePolicy := m.dataMap[ValidateEnforce]
m.dataMap[ValidateEnforce] = append(validatePolicy, policy)
}
continue
}
if rule.HasGenerate() {
if !generateMap[pName] {
generateMap[pName] = true
generatePolicy := m.dataMap[Generate]
m.dataMap[Generate] = append(generatePolicy, policy)
}
continue
}
}
}
func (m *pMap) get(key PolicyType) []*kyverno.ClusterPolicy {
m.RLock()
defer m.RUnlock()
return m.dataMap[key]
}
func (m *pMap) remove(policy *kyverno.ClusterPolicy) {
m.Lock()
defer m.Unlock()
dataMap := m.dataMap
for k, policies := range dataMap {
var newPolicies []*kyverno.ClusterPolicy
for _, p := range policies {
if p.GetName() == policy.GetName() {
continue
}
newPolicies = append(newPolicies, p)
}
m.dataMap[k] = newPolicies
}
}

View file

@ -0,0 +1,164 @@
package policycache
import (
"encoding/json"
"testing"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"gotest.tools/assert"
"sigs.k8s.io/controller-runtime/pkg/log"
)
func Test_All(t *testing.T) {
pCache := newPolicyCache(log.Log)
policy := newPolicy(t)
// add
pCache.Add(policy)
// get
if len(pCache.Get(Mutate)) != 1 {
t.Errorf("expected 1 mutate policy, found %v", len(pCache.Get(Mutate)))
}
if len(pCache.Get(ValidateEnforce)) != 1 {
t.Errorf("expected 1 validate enforce policy, found %v", len(pCache.Get(ValidateEnforce)))
}
if len(pCache.Get(Generate)) != 1 {
t.Errorf("expected 1 generate policy, found %v", len(pCache.Get(Generate)))
}
// remove
pCache.Remove(policy)
assert.Assert(t, len(pCache.Get(ValidateEnforce)) == 0)
}
func Test_Add_Duplicate_Policy(t *testing.T) {
pCache := newPolicyCache(log.Log)
policy := newPolicy(t)
pCache.Add(policy)
pCache.Add(policy)
pCache.Add(policy)
if len(pCache.Get(Mutate)) != 1 {
t.Errorf("expected 1 mutate policy, found %v", len(pCache.Get(Mutate)))
}
if len(pCache.Get(ValidateEnforce)) != 1 {
t.Errorf("expected 1 validate enforce policy, found %v", len(pCache.Get(ValidateEnforce)))
}
if len(pCache.Get(Generate)) != 1 {
t.Errorf("expected 1 generate policy, found %v", len(pCache.Get(Generate)))
}
}
func Test_Remove_From_Empty_Cache(t *testing.T) {
pCache := newPolicyCache(log.Log)
policy := newPolicy(t)
pCache.Remove(policy)
}
func newPolicy(t *testing.T) *kyverno.ClusterPolicy {
rawPolicy := []byte(`{
"spec": {
"validationFailureAction": "enforce",
"rules": [
{
"name": "deny-privileged-disallowpriviligedescalation",
"match": {
"resources": {
"kinds": [
"Pod"
]
}
},
"validate": {
"deny": {
"conditions": [
{
"key": "a",
"operator": "Equals",
"value": "a"
}
]
}
}
},
{
"name": "deny-privileged-disallowpriviligedescalation",
"match": {
"resources": {
"kinds": [
"Pod"
]
}
},
"validate": {
"pattern": {
"spec": {
"containers": [
{
"image": "!*:latest"
}
]
}
}
}
},
{
"name": "annotate-host-path",
"match": {
"resources": {
"kinds": [
"Pod"
]
}
},
"mutate": {
"overlay": {
"metadata": {
"annotations": {
"+(cluster-autoscaler.kubernetes.io/safe-to-evict)": true
}
}
}
}
},
{
"name": "default-deny-ingress",
"match": {
"resources": {
"kinds": [
"Namespace"
]
}
},
"generate": {
"kind": "NetworkPolicy",
"name": "default-deny-ingress",
"namespace": "{{request.object.metadata.name}}",
"data": {
"spec": {
"podSelector": {
},
"policyTypes": [
"Ingress"
]
}
}
}
}
]
}
}`)
var policy *kyverno.ClusterPolicy
err := json.Unmarshal(rawPolicy, &policy)
assert.NilError(t, err)
return policy
}

View file

@ -0,0 +1,78 @@
package policycache
import (
"reflect"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1"
"k8s.io/client-go/tools/cache"
)
// Controller is responsible for synchronizing Policy Cache,
// it embeds a policy informer to handle policy events.
// The cache is synced when a policy is add/update/delete.
// This cache is only used in the admission webhook to fast retrieve
// policies based on types (Mutate/ValidateEnforce/Generate).
type Controller struct {
pSynched cache.InformerSynced
Cache Interface
log logr.Logger
}
// NewPolicyCacheController create a new PolicyController
func NewPolicyCacheController(
pInformer kyvernoinformer.ClusterPolicyInformer,
log logr.Logger) *Controller {
pc := Controller{
Cache: newPolicyCache(log),
log: log,
}
pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
UpdateFunc: pc.updatePolicy,
DeleteFunc: pc.deletePolicy,
})
pc.pSynched = pInformer.Informer().HasSynced
return &pc
}
func (c *Controller) addPolicy(obj interface{}) {
p := obj.(*kyverno.ClusterPolicy)
c.Cache.Add(p)
}
func (c *Controller) updatePolicy(old, cur interface{}) {
pOld := old.(*kyverno.ClusterPolicy)
pNew := cur.(*kyverno.ClusterPolicy)
if reflect.DeepEqual(pOld.Spec, pNew.Spec) {
return
}
c.Cache.Remove(pOld)
c.Cache.Add(pNew)
}
func (c *Controller) deletePolicy(obj interface{}) {
p := obj.(*kyverno.ClusterPolicy)
c.Cache.Remove(p)
}
// Run waits until policy informer to be synced
func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
logger := c.log
logger.Info("starting")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, c.pSynched) {
logger.Info("failed to sync informer cache")
return
}
<-stopCh
}

9
pkg/policycache/type.go Normal file
View file

@ -0,0 +1,9 @@
package policycache
type PolicyType uint8
const (
Mutate PolicyType = 1 << iota
ValidateEnforce
Generate
)

View file

@ -115,11 +115,13 @@ func processResourceWithPatches(patch []byte, resource []byte, log logr.Logger)
return resource
}
func containRBACinfo(policies []*kyverno.ClusterPolicy) bool {
for _, policy := range policies {
for _, rule := range policy.Spec.Rules {
if len(rule.MatchResources.Roles) > 0 || len(rule.MatchResources.ClusterRoles) > 0 || len(rule.ExcludeResources.Roles) > 0 || len(rule.ExcludeResources.ClusterRoles) > 0 {
return true
func containRBACinfo(policies ...[]*kyverno.ClusterPolicy) bool {
for _, policySlice := range policies {
for _, policy := range policySlice {
for _, rule := range policy.Spec.Rules {
if len(rule.MatchResources.Roles) > 0 || len(rule.MatchResources.ClusterRoles) > 0 || len(rule.ExcludeResources.Roles) > 0 || len(rule.ExcludeResources.ClusterRoles) > 0 {
return true
}
}
}
}

View file

@ -21,6 +21,9 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
logger.V(4).Info("incoming request")
var engineResponses []response.EngineResponse
if len(policies) == 0 {
return true, ""
}
// convert RAW to unstructured
resource, err := utils.ConvertToUnstructured(request.Object.Raw)
if err != nil {
@ -50,12 +53,11 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
}
}
// Adds Generate Request to a channel(queue size 1000) to generators
if err := applyGenerateRequest(ws.grGenerator, userRequestInfo,request.Operation, engineResponses...); err != nil {
if err := applyGenerateRequest(ws.grGenerator, userRequestInfo, request.Operation, engineResponses...); err != nil {
//TODO: send appropriate error
return false, "Kyverno blocked: failed to create Generate Requests"
}
// Generate Stats wont be used here, as we delegate the generate rule
// - Filter policies that apply on this resource
// - - build CR context(userInfo+roles+clusterRoles)
@ -67,9 +69,9 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
return true, ""
}
func applyGenerateRequest(gnGenerator generate.GenerateRequests, userRequestInfo kyverno.RequestInfo,action v1beta1.Operation, engineResponses ...response.EngineResponse) error {
func applyGenerateRequest(gnGenerator generate.GenerateRequests, userRequestInfo kyverno.RequestInfo, action v1beta1.Operation, engineResponses ...response.EngineResponse) error {
for _, er := range engineResponses {
if err := gnGenerator.Apply(transform(userRequestInfo, er),action); err != nil {
if err := gnGenerator.Apply(transform(userRequestInfo, er), action); err != nil {
return err
}
}

View file

@ -25,6 +25,10 @@ func (ws *WebhookServer) HandleMutation(
ctx *context.Context,
userRequestInfo kyverno.RequestInfo) []byte {
if len(policies) == 0 {
return nil
}
resourceName := request.Kind.Kind + "/" + request.Name
if request.Namespace != "" {
resourceName = request.Namespace + "/" + resourceName

View file

@ -10,8 +10,6 @@ import (
"net/http"
"time"
"k8s.io/apimachinery/pkg/labels"
"github.com/go-logr/logr"
"github.com/julienschmidt/httprouter"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
@ -24,6 +22,7 @@ import (
context2 "github.com/nirmata/kyverno/pkg/engine/context"
"github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/openapi"
"github.com/nirmata/kyverno/pkg/policycache"
"github.com/nirmata/kyverno/pkg/policystatus"
"github.com/nirmata/kyverno/pkg/policyviolation"
tlsutils "github.com/nirmata/kyverno/pkg/tls"
@ -65,6 +64,9 @@ type WebhookServer struct {
// generate events
eventGen event.Interface
// policy cache
pCache policycache.Interface
// webhook registration client
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
@ -89,6 +91,8 @@ type WebhookServer struct {
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister
log logr.Logger
openAPIController *openapi.Controller
supportMudateValidate bool
}
// NewWebhookServer creates new instance of WebhookServer accordingly to given configuration
@ -101,12 +105,14 @@ func NewWebhookServer(
rbInformer rbacinformer.RoleBindingInformer,
crbInformer rbacinformer.ClusterRoleBindingInformer,
eventGen event.Interface,
pCache policycache.Interface,
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient,
statusSync policystatus.Listener,
configHandler config.Interface,
pvGenerator policyviolation.GeneratorInterface,
grGenerator *generate.Generator,
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister,
supportMudateValidate bool,
cleanUp chan<- struct{},
log logr.Logger,
openAPIController *openapi.Controller,
@ -133,6 +139,7 @@ func NewWebhookServer(
crbLister: crbInformer.Lister(),
crbSynced: crbInformer.Informer().HasSynced,
eventGen: eventGen,
pCache: pCache,
webhookRegistrationClient: webhookRegistrationClient,
statusListener: statusSync,
configHandler: configHandler,
@ -143,6 +150,7 @@ func NewWebhookServer(
resourceWebhookWatcher: resourceWebhookWatcher,
log: log,
openAPIController: openAPIController,
supportMudateValidate: supportMudateValidate,
}
mux := httprouter.New()
@ -236,16 +244,15 @@ func (ws *WebhookServer) resourceMutation(request *v1beta1.AdmissionRequest) *v1
}
logger := ws.log.WithName("resourceMutation").WithValues("uid", request.UID, "kind", request.Kind.Kind, "namespace", request.Namespace, "name", request.Name, "operation", request.Operation)
policies, err := ws.pLister.ListResources(labels.NewSelector())
if err != nil {
// Unable to connect to policy Lister to access policies
logger.Error(err, "failed to list policies. Policies are NOT being applied")
return &v1beta1.AdmissionResponse{Allowed: true}
}
mutatePolicies := ws.pCache.Get(policycache.Mutate)
validatePolicies := ws.pCache.Get(policycache.ValidateEnforce)
generatePolicies := ws.pCache.Get(policycache.Generate)
// getRoleRef only if policy has roles/clusterroles defined
var roles, clusterRoles []string
if containRBACinfo(policies) {
var err error
if containRBACinfo(mutatePolicies, validatePolicies, generatePolicies) {
roles, clusterRoles, err = userinfo.GetRoleRef(ws.rbLister, ws.crbLister, request)
if err != nil {
// TODO(shuting): continue apply policy if error getting roleRef?
@ -291,13 +298,14 @@ func (ws *WebhookServer) resourceMutation(request *v1beta1.AdmissionRequest) *v1
var patches []byte
patchedResource := request.Object.Raw
higherVersion := utils.HigherThanKubernetesVersion(ws.client, ws.log, 1, 14, 0)
if higherVersion {
if ws.supportMudateValidate {
// MUTATION
// mutation failure should not block the resource creation
// any mutation failure is reported as the violation
patches = ws.HandleMutation(request, resource, policies, ctx, userRequestInfo)
logger.V(6).Info("", "generated patches", string(patches))
if request.Operation != v1beta1.Delete {
patches = ws.HandleMutation(request, resource, mutatePolicies, ctx, userRequestInfo)
logger.V(6).Info("", "generated patches", string(patches))
}
// patch the resource with patches before handling validation rules
patchedResource = processResourceWithPatches(patches, request.Object.Raw, logger)
@ -305,7 +313,7 @@ func (ws *WebhookServer) resourceMutation(request *v1beta1.AdmissionRequest) *v1
if ws.resourceWebhookWatcher != nil && ws.resourceWebhookWatcher.RunValidationInMutatingWebhook == "true" {
// VALIDATION
ok, msg := ws.HandleValidation(request, policies, patchedResource, ctx, userRequestInfo)
ok, msg := ws.HandleValidation(request, validatePolicies, patchedResource, ctx, userRequestInfo)
if !ok {
logger.Info("admission request denied")
return &v1beta1.AdmissionResponse{
@ -326,7 +334,7 @@ func (ws *WebhookServer) resourceMutation(request *v1beta1.AdmissionRequest) *v1
// Success -> Generate Request CR created successsfully
// Failed -> Failed to create Generate Request CR
if request.Operation == v1beta1.Create || request.Operation == v1beta1.Update {
ok, msg := ws.HandleGenerate(request, policies, ctx, userRequestInfo)
ok, msg := ws.HandleGenerate(request, generatePolicies, ctx, userRequestInfo)
if !ok {
logger.Info("admission request denied")
return &v1beta1.AdmissionResponse{
@ -355,7 +363,7 @@ func (ws *WebhookServer) resourceMutation(request *v1beta1.AdmissionRequest) *v1
func (ws *WebhookServer) resourceValidation(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
logger := ws.log.WithName("resourceValidation").WithValues("uid", request.UID, "kind", request.Kind.Kind, "namespace", request.Namespace, "name", request.Name, "operation", request.Operation)
if ok := utils.HigherThanKubernetesVersion(ws.client, ws.log, 1, 14, 0); !ok {
if !ws.supportMudateValidate {
logger.Info("mutate and validate rules are not supported prior to Kubernetes 1.14.0")
return &v1beta1.AdmissionResponse{
Allowed: true,
@ -374,15 +382,14 @@ func (ws *WebhookServer) resourceValidation(request *v1beta1.AdmissionRequest) *
}
}
policies, err := ws.pLister.ListResources(labels.NewSelector())
if err != nil {
// Unable to connect to policy Lister to access policies
logger.Error(err, "failed to list policies. Policies are NOT being applied")
policies := ws.pCache.Get(policycache.ValidateEnforce)
if len(policies) == 0 {
logger.V(4).Info("No enforce Validation policy found, returning")
return &v1beta1.AdmissionResponse{Allowed: true}
}
var roles, clusterRoles []string
var err error
// getRoleRef only if policy has roles/clusterroles defined
if containRBACinfo(policies) {
roles, clusterRoles, err = userinfo.GetRoleRef(ws.rbLister, ws.crbLister, request)

View file

@ -28,6 +28,10 @@ func (ws *WebhookServer) HandleValidation(
ctx *context.Context,
userRequestInfo kyverno.RequestInfo) (bool, string) {
if len(policies) == 0 {
return true, ""
}
resourceName := request.Kind.Kind + "/" + request.Name
if request.Namespace != "" {
resourceName = request.Namespace + "/" + resourceName