1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 10:28:36 +00:00

Merge pull request #292 from nirmata/policy_status

Policy status
This commit is contained in:
Shivkumar Dudhani 2019-08-21 00:23:53 -07:00 committed by GitHub
commit b180284003
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 463 additions and 72 deletions

86
main.go
View file

@ -34,6 +34,7 @@ const defaultReSyncTime = 10 * time.Second
func main() {
defer glog.Flush()
printVersionInfo()
// profile cpu and memory consuption
prof = enableProfiling(cpu, memory)
// CLIENT CONFIG
@ -58,55 +59,40 @@ func main() {
glog.Fatalf("Error creating client: %v\n", err)
}
// KYVERNO CRD INFORMER
// watches CRD resources:
// - Policy
// - PolicyVolation
// - cache resync time: 10 seconds
pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, defaultReSyncTime)
// EVENT GENERATOR
// - generate event with retry
egen := event.NewEventGenerator(client, pInformer.Kyverno().V1alpha1().Policies())
// KUBERNETES CLIENT
kubeClient, err := utils.NewKubeClient(clientConfig)
if err != nil {
glog.Fatalf("Error creating kubernetes client: %v\n", err)
}
// KYVERNO CRD INFORMER
// watches CRD resources:
// - Policy
// - PolicyVolation
// - cache resync time: 10 seconds
kubeInformer := kubeinformer.NewSharedInformerFactoryWithOptions(kubeClient, defaultReSyncTime)
pInformer := kyvernoinformer.NewSharedInformerFactoryWithOptions(pclient, 10*time.Second)
// MutatingWebhookConfiguration Informer
mutatingWebhookConfigurationInformer := kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations()
// KUBERNETES RESOURCES INFORMER
// watches namespace resource
// - cache resync time: 10 seconds
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 10*time.Second)
tlsPair, err := initTLSPemPair(clientConfig, client)
if err != nil {
glog.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err)
}
// WEBHOOK REGISTRATION
// -- validationwebhookconfiguration (Policy)
// -- mutatingwebhookconfiguration (All resources)
webhookRegistrationClient, err := webhooks.NewWebhookRegistrationClient(clientConfig, client, serverIP, int32(webhookTimeout))
if err != nil {
glog.Fatalf("Unable to register admission webhooks on cluster: %v\n", err)
}
if err = webhookRegistrationClient.Register(); err != nil {
glog.Fatalf("Failed registering Admission Webhooks: %v\n", err)
}
// EVENT GENERATOR
// - generate event with retry mechanism
egen := event.NewEventGenerator(client, pInformer.Kyverno().V1alpha1().Policies())
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
// - status: violation count
pc, err := policy.NewPolicyController(pclient, client, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen, mutatingWebhookConfigurationInformer, webhookRegistrationClient)
// - status aggregator: recieves stats when a policy is applied
// & updates the policy status
pc, err := policy.NewPolicyController(pclient, client, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen, kubeInformer.Admissionregistration().V1beta1().MutatingWebhookConfigurations(), webhookRegistrationClient)
if err != nil {
glog.Fatalf("error creating policy controller: %v\n", err)
}
// POLICY VIOLATION CONTROLLER
// policy violation cleanup if the corresponding resource is deleted
// status: lastUpdatTime
pvc, err := policyviolation.NewPolicyViolationController(client, pclient, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations())
if err != nil {
@ -115,19 +101,43 @@ func main() {
// GENERATE CONTROLLER
// - watches for Namespace resource and generates resource based on the policy generate rule
nsc := namespace.NewNamespaceController(pclient, client, kubeInformer.Core().V1().Namespaces(), pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen)
nsc := namespace.NewNamespaceController(pclient, client, kubeInformer.Core().V1().Namespaces(), pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), pc.GetPolicyStatusAggregator(), egen)
server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen, webhookRegistrationClient, filterK8Resources)
// CONFIGURE CERTIFICATES
tlsPair, err := initTLSPemPair(clientConfig, client)
if err != nil {
glog.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err)
}
// WERBHOOK REGISTRATION CLIENT
webhookRegistrationClient, err := webhooks.NewWebhookRegistrationClient(clientConfig, client, serverIP, int32(webhookTimeout))
if err != nil {
glog.Fatalf("Unable to register admission webhooks on cluster: %v\n", err)
}
// WEBHOOK REGISTRATION
// - validationwebhookconfiguration (Policy)
// - mutatingwebhookconfiguration (All resources)
// webhook confgiuration is also generated dynamically in the policy controller
// based on the policy resources created
if err = webhookRegistrationClient.Register(); err != nil {
glog.Fatalf("Failed registering Admission Webhooks: %v\n", err)
}
// WEBHOOOK
// - https server to provide endpoints called based on rules defined in Mutating & Validation webhook configuration
// - reports the results based on the response from the policy engine:
// -- annotations on resources with update details on mutation JSON patches
// -- generate policy violation resource
// -- generate events on policy and resource
server, err := webhooks.NewWebhookServer(pclient, client, tlsPair, pInformer.Kyverno().V1alpha1().Policies(), pInformer.Kyverno().V1alpha1().PolicyViolations(), egen, webhookRegistrationClient, pc.GetPolicyStatusAggregator(), filterK8Resources)
if err != nil {
glog.Fatalf("Unable to create webhook server: %v\n", err)
}
stopCh := signals.SetupSignalHandler()
if err = webhookRegistrationClient.Register(); err != nil {
glog.Fatalf("Failed registering Admission Webhooks: %v\n", err)
}
// Start the components
pInformer.Start(stopCh)
kubeInformer.Start(stopCh)
go pc.Run(1, stopCh)

View file

@ -29,6 +29,8 @@ var (
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&Policy{},
&PolicyList{},
&PolicyViolation{},
&PolicyViolationList{},
)

View file

@ -89,7 +89,17 @@ type CloneFrom struct {
//PolicyStatus provides status for violations
type PolicyStatus struct {
Violations int `json:"violations"`
ViolationCount int `json:"violationCount"`
// Count of rules that were applied
RulesAppliedCount int `json:"rulesAppliedCount"`
// Count of resources for whom update/create api requests were blocked as the resoruce did not satisfy the policy rules
ResourcesBlockedCount int `json:"resourcesBlockedCount"`
// average time required to process the policy Mutation rules on a resource
AvgExecutionTimeMutation string `json:"averageMutationRulesExecutionTime"`
// average time required to process the policy Validation rules on a resource
AvgExecutionTimeValidation string `json:"averageValidationRulesExecutionTime"`
// average time required to process the policy Validation rules on a resource
AvgExecutionTimeGeneration string `json:"averageGenerationRulesExecutionTime"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View file

@ -3,6 +3,8 @@ package engine
import (
"encoding/json"
"errors"
"time"
"fmt"
"github.com/golang/glog"
@ -16,7 +18,19 @@ import (
)
//Generate apply generation rules on a resource
func Generate(client *client.Client, policy kyverno.Policy, ns unstructured.Unstructured) []info.RuleInfo {
func Generate(client *client.Client, policy kyverno.Policy, ns unstructured.Unstructured) (response EngineResponse) {
startTime := time.Now()
glog.V(4).Infof("started applying generation rules of policy %q (%v)", policy.Name, startTime)
defer func() {
response.ExecutionTime = time.Since(startTime)
glog.V(4).Infof("Finished applying generation rules policy %q (%v)", policy.Name, response.ExecutionTime)
glog.V(4).Infof("Generation Rules appplied count %q for policy %q", response.RulesAppliedCount, policy.Name)
}()
incrementAppliedRuleCount := func() {
// rules applied succesfully count
response.RulesAppliedCount++
}
ris := []info.RuleInfo{}
for _, rule := range policy.Spec.Rules {
if rule.Generation == (kyverno.Generation{}) {
@ -34,8 +48,10 @@ func Generate(client *client.Client, policy kyverno.Policy, ns unstructured.Unst
glog.Infof("succesfully applied policy %s rule %s on resource %s/%s/%s", policy.Name, rule.Name, ns.GetKind(), ns.GetNamespace(), ns.GetName())
}
ris = append(ris, ri)
incrementAppliedRuleCount()
}
return ris
response.RuleInfos = ris
return response
}
func applyRuleGenerator(client *client.Client, ns unstructured.Unstructured, gen kyverno.Generation, policyCreationTime metav1.Time) error {

View file

@ -2,6 +2,7 @@ package engine
import (
"reflect"
"time"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1"
@ -10,11 +11,24 @@ import (
)
// Mutate performs mutation. Overlay first and then mutation patches
func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineResponse {
func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) (response EngineResponse) {
// var response EngineResponse
var allPatches, rulePatches [][]byte
var err error
var errs []error
ris := []info.RuleInfo{}
startTime := time.Now()
glog.V(4).Infof("started applying mutation rules of policy %q (%v)", policy.Name, startTime)
defer func() {
response.ExecutionTime = time.Since(startTime)
glog.V(4).Infof("finished applying mutation rules policy %v (%v)", policy.Name, response.ExecutionTime)
glog.V(4).Infof("Mutation Rules appplied succesfully count %v for policy %q", response.RulesAppliedCount, policy.Name)
}()
incrementAppliedRuleCount := func() {
// rules applied succesfully count
response.RulesAppliedCount++
}
patchedDocument, err := resource.MarshalJSON()
if err != nil {
@ -23,7 +37,8 @@ func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineRes
if err != nil {
glog.V(4).Infof("unable to marshal resource : %v", err)
return EngineResponse{PatchedResource: resource}
response.PatchedResource = resource
return response
}
for _, rule := range policy.Spec.Rules {
@ -66,6 +81,7 @@ func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineRes
ruleInfo.Fail()
ruleInfo.Addf("failed to apply overlay: %v", err)
}
incrementAppliedRuleCount()
}
// Process Patches
@ -84,6 +100,7 @@ func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineRes
ruleInfo.Patches = rulePatches
allPatches = append(allPatches, rulePatches...)
}
incrementAppliedRuleCount()
}
patchedDocument, err = ApplyPatches(patchedDocument, rulePatches)
@ -97,12 +114,12 @@ func Mutate(policy kyverno.Policy, resource unstructured.Unstructured) EngineRes
patchedResource, err := ConvertToUnstructured(patchedDocument)
if err != nil {
glog.Errorf("Failed to convert patched resource to unstructuredtype, err%v\n:", err)
return EngineResponse{PatchedResource: resource}
response.PatchedResource = resource
return response
}
return EngineResponse{
Patches: allPatches,
PatchedResource: *patchedResource,
RuleInfos: ris,
}
response.Patches = allPatches
response.PatchedResource = *patchedResource
response.RuleInfos = ris
return response
}

View file

@ -5,6 +5,7 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/golang/glog"
@ -18,10 +19,20 @@ import (
"k8s.io/apimachinery/pkg/labels"
)
//EngineResponse provides the response to the application of a policy rule set on a resource
type EngineResponse struct {
Patches [][]byte
PatchedResource unstructured.Unstructured
RuleInfos []info.RuleInfo
EngineStats
}
//EngineStats stores in the statistics for a single application of resource
type EngineStats struct {
// average time required to process the policy rules on a resource
ExecutionTime time.Duration
// Count of rules that were applied succesfully
RulesAppliedCount int
}
// //ListResourcesThatApplyToPolicy returns list of resources that are filtered by policy rules

View file

@ -8,6 +8,7 @@ import (
"reflect"
"strconv"
"strings"
"time"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1"
@ -17,17 +18,31 @@ import (
// Validate handles validating admission request
// Checks the target resources for rules defined in the policy
func Validate(policy kyverno.Policy, resource unstructured.Unstructured) EngineResponse {
func Validate(policy kyverno.Policy, resource unstructured.Unstructured) (response EngineResponse) {
// var response EngineResponse
startTime := time.Now()
glog.V(4).Infof("started applying validation rules of policy %q (%v)", policy.Name, startTime)
defer func() {
response.ExecutionTime = time.Since(startTime)
glog.V(4).Infof("Finished applying validation rules policy %v (%v)", policy.Name, response.ExecutionTime)
glog.V(4).Infof("Validation Rules appplied succesfully count %v for policy %q", response.RulesAppliedCount, policy.Name)
}()
incrementAppliedRuleCount := func() {
// rules applied succesfully count
response.RulesAppliedCount++
}
resourceRaw, err := resource.MarshalJSON()
if err != nil {
glog.V(4).Infof("Skip processing validating rule, unable to marshal resource : %v\n", err)
return EngineResponse{PatchedResource: resource}
response.PatchedResource = resource
return response
}
var resourceInt interface{}
if err := json.Unmarshal(resourceRaw, &resourceInt); err != nil {
glog.V(4).Infof("unable to unmarshal resource : %v\n", err)
return EngineResponse{PatchedResource: resource}
response.PatchedResource = resource
return response
}
var ruleInfos []info.RuleInfo
@ -56,10 +71,11 @@ func Validate(policy kyverno.Policy, resource unstructured.Unstructured) EngineR
ruleInfo.Add("Pattern succesfully validated")
glog.V(4).Infof("pattern validated succesfully on resource %s/%s", resource.GetNamespace(), resource.GetName())
}
incrementAppliedRuleCount()
ruleInfos = append(ruleInfos, ruleInfo)
}
return EngineResponse{RuleInfos: ruleInfos}
response.RuleInfos = ruleInfos
return response
}
// validateResourceWithPattern is a start of element-by-element validation process

View file

@ -8,6 +8,7 @@ import (
"github.com/golang/glog"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/policy"
"k8s.io/apimachinery/pkg/api/errors"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
@ -44,7 +45,8 @@ type NamespaceController struct {
pvListerSynced cache.InformerSynced
// pvLister can list/get policy violation from the shared informer's store
pvLister kyvernolister.PolicyViolationLister
// API to send policy stats for aggregation
policyStatus policy.PolicyStatusInterface
// eventGen provides interface to generate evenets
eventGen event.Interface
// Namespaces that need to be synced
@ -59,6 +61,7 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset,
nsInformer v1Informer.NamespaceInformer,
pInformer kyvernoinformer.PolicyInformer,
pvInformer kyvernoinformer.PolicyViolationInformer,
policyStatus policy.PolicyStatusInterface,
eventGen event.Interface) *NamespaceController {
//TODO: do we need to event recorder for this controller?
// create the controller
@ -83,6 +86,7 @@ func NewNamespaceController(kyvernoClient *kyvernoclient.Clientset,
nsc.pLister = pInformer.Lister()
nsc.pvListerSynced = pInformer.Informer().HasSynced
nsc.pvLister = pvInformer.Lister()
nsc.policyStatus = policyStatus
// resource manager
// rebuild after 300 seconds/ 5 mins

View file

@ -6,6 +6,7 @@ import (
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/policy"
"github.com/golang/glog"
@ -13,7 +14,9 @@ import (
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1alpha1"
"github.com/nirmata/kyverno/pkg/info"
policyctr "github.com/nirmata/kyverno/pkg/policy"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@ -108,7 +111,7 @@ func (nsc *NamespaceController) processNamespace(namespace corev1.Namespace) []i
glog.V(4).Infof("policy %s with resource version %s already processed on resource %s/%s/%s with resource version %s", policy.Name, policy.ResourceVersion, ns.GetKind(), ns.GetNamespace(), ns.GetName(), ns.GetResourceVersion())
continue
}
policyInfo := applyPolicy(nsc.client, ns, *policy)
policyInfo := applyPolicy(nsc.client, ns, *policy, nsc.policyStatus)
policyInfos = append(policyInfos, policyInfo)
// post-processing, register the resource as processed
nsc.rm.RegisterResource(policy.GetName(), policy.GetResourceVersion(), ns.GetKind(), ns.GetNamespace(), ns.GetName(), ns.GetResourceVersion())
@ -141,15 +144,32 @@ func listpolicies(ns unstructured.Unstructured, pLister kyvernolister.PolicyList
return filteredpolicies
}
func applyPolicy(client *client.Client, resource unstructured.Unstructured, policy kyverno.Policy) info.PolicyInfo {
func applyPolicy(client *client.Client, resource unstructured.Unstructured, policy kyverno.Policy, policyStatus policy.PolicyStatusInterface) info.PolicyInfo {
var ps policyctr.PolicyStat
gatherStat := func(policyName string, er engine.EngineResponse) {
// ps := policyctr.PolicyStat{}
ps.PolicyName = policyName
ps.Stats.GenerationExecutionTime = er.ExecutionTime
ps.Stats.RulesAppliedCount = er.RulesAppliedCount
}
// send stats for aggregation
sendStat := func(blocked bool) {
//SEND
policyStatus.SendStat(ps)
}
startTime := time.Now()
glog.V(4).Infof("Started apply policy %s on resource %s/%s/%s (%v)", policy.Name, resource.GetKind(), resource.GetNamespace(), resource.GetName(), startTime)
defer func() {
glog.V(4).Infof("Finished applying %s on resource %s/%s/%s (%v)", policy.Name, resource.GetKind(), resource.GetNamespace(), resource.GetName(), time.Since(startTime))
}()
policyInfo := info.NewPolicyInfo(policy.Name, resource.GetKind(), resource.GetName(), resource.GetNamespace(), policy.Spec.ValidationFailureAction)
ruleInfos := engine.Generate(client, policy, resource)
policyInfo.AddRuleInfos(ruleInfos)
engineResponse := engine.Generate(client, policy, resource)
policyInfo.AddRuleInfos(engineResponse.RuleInfos)
// gather stats
gatherStat(policy.Name, engineResponse)
//send stats
sendStat(false)
return policyInfo
}

View file

@ -14,7 +14,20 @@ import (
// applyPolicy applies policy on a resource
//TODO: generation rules
func applyPolicy(policy kyverno.Policy, resource unstructured.Unstructured) (info.PolicyInfo, error) {
func applyPolicy(policy kyverno.Policy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface) (info.PolicyInfo, error) {
var ps PolicyStat
gatherStat := func(policyName string, er engine.EngineResponse) {
// ps := policyctr.PolicyStat{}
ps.PolicyName = policyName
ps.Stats.ValidationExecutionTime = er.ExecutionTime
ps.Stats.RulesAppliedCount = er.RulesAppliedCount
}
// send stats for aggregation
sendStat := func(blocked bool) {
//SEND
policyStatus.SendStat(ps)
}
startTime := time.Now()
glog.V(4).Infof("Started apply policy %s on resource %s/%s/%s (%v)", policy.Name, resource.GetKind(), resource.GetNamespace(), resource.GetName(), startTime)
defer func() {
@ -24,7 +37,7 @@ func applyPolicy(policy kyverno.Policy, resource unstructured.Unstructured) (inf
policyInfo := info.NewPolicyInfo(policy.Name, resource.GetKind(), resource.GetName(), resource.GetNamespace(), policy.Spec.ValidationFailureAction)
//MUTATION
mruleInfos, err := mutation(policy, resource)
mruleInfos, err := mutation(policy, resource, policyStatus)
policyInfo.AddRuleInfos(mruleInfos)
if err != nil {
return policyInfo, err
@ -35,13 +48,36 @@ func applyPolicy(policy kyverno.Policy, resource unstructured.Unstructured) (inf
if len(engineResponse.RuleInfos) != 0 {
policyInfo.AddRuleInfos(engineResponse.RuleInfos)
}
// gather stats
gatherStat(policy.Name, engineResponse)
//send stats
sendStat(false)
//TODO: GENERATION
return policyInfo, nil
}
func mutation(policy kyverno.Policy, resource unstructured.Unstructured) ([]info.RuleInfo, error) {
func mutation(policy kyverno.Policy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface) ([]info.RuleInfo, error) {
var ps PolicyStat
// gather stats from the engine response
gatherStat := func(policyName string, er engine.EngineResponse) {
// ps := policyctr.PolicyStat{}
ps.PolicyName = policyName
ps.Stats.MutationExecutionTime = er.ExecutionTime
ps.Stats.RulesAppliedCount = er.RulesAppliedCount
}
// send stats for aggregation
sendStat := func(blocked bool) {
//SEND
policyStatus.SendStat(ps)
}
engineResponse := engine.Mutate(policy, resource)
// gather stats
gatherStat(policy.Name, engineResponse)
//send stats
sendStat(false)
patches := engineResponse.Patches
ruleInfos := engineResponse.RuleInfos
if len(ruleInfos) == 0 {

View file

@ -76,6 +76,8 @@ type PolicyController struct {
rm resourceManager
// filter the resources defined in the list
filterK8Resources []utils.K8Resource
// recieves stats and aggregates details
statusAggregator *PolicyStatusAggregator
}
// NewPolicyController create a new PolicyController
@ -128,6 +130,10 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, client *client.
//TODO: pass the time in seconds instead of converting it internally
pc.rm = NewResourceManager(30)
// aggregator
// pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient, pInformer)
pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient)
return &pc, nil
}
@ -347,6 +353,9 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
for i := 0; i < workers; i++ {
go wait.Until(pc.worker, time.Second, stopCh)
}
// policy status aggregator
//TODO: workers required for aggergation
pc.statusAggregator.Run(1, stopCh)
<-stopCh
}
@ -396,10 +405,14 @@ func (pc *PolicyController) syncPolicy(key string) error {
policy, err := pc.pLister.Get(key)
if errors.IsNotFound(err) {
glog.V(2).Infof("Policy %v has been deleted", key)
if err := pc.handleWebhookRegistration(true, nil); err != nil {
// remove the recorded stats for the policy
pc.statusAggregator.RemovePolicyStats(key)
// remove webhook configurations if there are not policies
if err := pc.handleWebhookRegistration(true, nil); err != nil {
glog.Errorln(err)
}
return err
return nil
}
if err != nil {
@ -422,6 +435,8 @@ func (pc *PolicyController) syncPolicy(key string) error {
policyInfos := pc.processExistingResources(*p)
// report errors
pc.report(policyInfos)
// fetch the policy again via the aggreagator to remain consistent
// return pc.statusAggregator.UpdateViolationCount(p.Name, pvList)
return pc.syncStatusOnly(p, pvList)
}
@ -470,7 +485,7 @@ func (pc *PolicyController) handleWebhookRegistration(delete bool, policy *kyver
// status:
// - violations : (count of the resources that violate this policy )
func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error {
newStatus := calculateStatus(pvList)
newStatus := pc.calculateStatus(p.Name, pvList)
if reflect.DeepEqual(newStatus, p.Status) {
// no update to status
return nil
@ -482,10 +497,19 @@ func (pc *PolicyController) syncStatusOnly(p *kyverno.Policy, pvList []*kyverno.
return err
}
func calculateStatus(pvList []*kyverno.PolicyViolation) kyverno.PolicyStatus {
func (pc *PolicyController) calculateStatus(policyName string, pvList []*kyverno.PolicyViolation) kyverno.PolicyStatus {
violationCount := len(pvList)
status := kyverno.PolicyStatus{
Violations: violationCount,
ViolationCount: violationCount,
}
// get stats
stats := pc.statusAggregator.GetPolicyStats(policyName)
if stats != (PolicyStatInfo{}) {
status.RulesAppliedCount = stats.RulesAppliedCount
status.ResourcesBlockedCount = stats.ResourceBlocked
status.AvgExecutionTimeMutation = stats.MutationExecutionTime.String()
status.AvgExecutionTimeValidation = stats.ValidationExecutionTime.String()
status.AvgExecutionTimeGeneration = stats.GenerationExecutionTime.String()
}
return status
}

View file

@ -29,7 +29,7 @@ func (pc *PolicyController) processExistingResources(policy kyverno.Policy) []in
}
// apply the policy on each
glog.V(4).Infof("apply policy %s with resource version %s on resource %s/%s/%s with resource version %s", policy.Name, policy.ResourceVersion, resource.GetKind(), resource.GetNamespace(), resource.GetName(), resource.GetResourceVersion())
policyInfo := applyPolicyOnResource(policy, resource)
policyInfo := applyPolicyOnResource(policy, resource, pc.statusAggregator)
policyInfos = append(policyInfos, *policyInfo)
// post-processing, register the resource as processed
pc.rm.RegisterResource(policy.GetName(), policy.GetResourceVersion(), resource.GetKind(), resource.GetNamespace(), resource.GetName(), resource.GetResourceVersion())
@ -37,8 +37,8 @@ func (pc *PolicyController) processExistingResources(policy kyverno.Policy) []in
return policyInfos
}
func applyPolicyOnResource(policy kyverno.Policy, resource unstructured.Unstructured) *info.PolicyInfo {
policyInfo, err := applyPolicy(policy, resource)
func applyPolicyOnResource(policy kyverno.Policy, resource unstructured.Unstructured, policyStatus PolicyStatusInterface) *info.PolicyInfo {
policyInfo, err := applyPolicy(policy, resource, policyStatus)
if err != nil {
glog.V(4).Infof("failed to process policy %s on resource %s/%s/%s: %v", policy.GetName(), resource.GetKind(), resource.GetNamespace(), resource.GetName(), err)
return nil

165
pkg/policy/status.go Normal file
View file

@ -0,0 +1,165 @@
package policy
import (
"sync"
"time"
"github.com/golang/glog"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
)
//PolicyStatusAggregator stores information abt aggregation
type PolicyStatusAggregator struct {
// time since we start aggregating the stats
startTime time.Time
// channel to recieve stats
ch chan PolicyStat
//TODO: lock based on key, possibly sync.Map ?
//sync RW for policyData
mux sync.RWMutex
// stores aggregated stats for policy
policyData map[string]PolicyStatInfo
}
//NewPolicyStatAggregator returns a new policy status
func NewPolicyStatAggregator(client *kyvernoclient.Clientset,
// pInformer kyvernoinformer.PolicyInformer
) *PolicyStatusAggregator {
psa := PolicyStatusAggregator{
startTime: time.Now(),
ch: make(chan PolicyStat),
policyData: map[string]PolicyStatInfo{},
}
return &psa
}
//Run begins aggregator
func (psa *PolicyStatusAggregator) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.V(4).Info("Started aggregator for policy status stats")
defer func() {
glog.V(4).Info("Shutting down aggregator for policy status stats")
}()
for i := 0; i < workers; i++ {
go wait.Until(psa.process, time.Second, stopCh)
}
}
func (psa *PolicyStatusAggregator) process() {
// As mutation and validation are handled seperately
// ideally we need to combine the exection time from both for a policy
// but its tricky to detect here the type of rules policy contains
// so we dont combine the results, but instead compute the execution time for
// mutation & validation rules seperately
for r := range psa.ch {
glog.V(4).Infof("recieved policy stats %v", r)
psa.aggregate(r)
}
}
func (psa *PolicyStatusAggregator) aggregate(ps PolicyStat) {
func() {
glog.V(4).Infof("write lock update policy %s", ps.PolicyName)
psa.mux.Lock()
}()
defer func() {
glog.V(4).Infof("write Unlock update policy %s", ps.PolicyName)
psa.mux.Unlock()
}()
info, ok := psa.policyData[ps.PolicyName]
if !ok {
psa.policyData[ps.PolicyName] = ps.Stats
glog.V(4).Infof("added stats for policy %s", ps.PolicyName)
return
}
// aggregate
info.RulesAppliedCount = info.RulesAppliedCount + ps.Stats.RulesAppliedCount
if ps.Stats.ResourceBlocked == 1 {
info.ResourceBlocked++
}
var zeroDuration time.Duration
if info.MutationExecutionTime != zeroDuration {
info.MutationExecutionTime = (info.MutationExecutionTime + ps.Stats.MutationExecutionTime) / 2
glog.V(4).Infof("updated avg mutation time %v", info.MutationExecutionTime)
} else {
info.MutationExecutionTime = ps.Stats.MutationExecutionTime
}
if info.ValidationExecutionTime != zeroDuration {
info.ValidationExecutionTime = (info.ValidationExecutionTime + ps.Stats.ValidationExecutionTime) / 2
glog.V(4).Infof("updated avg validation time %v", info.ValidationExecutionTime)
} else {
info.ValidationExecutionTime = ps.Stats.ValidationExecutionTime
}
if info.GenerationExecutionTime != zeroDuration {
info.GenerationExecutionTime = (info.GenerationExecutionTime + ps.Stats.GenerationExecutionTime) / 2
glog.V(4).Infof("updated avg generation time %v", info.GenerationExecutionTime)
} else {
info.GenerationExecutionTime = ps.Stats.GenerationExecutionTime
}
// update
psa.policyData[ps.PolicyName] = info
glog.V(4).Infof("updated stats for policy %s", ps.PolicyName)
}
//GetPolicyStats returns the policy stats
func (psa *PolicyStatusAggregator) GetPolicyStats(policyName string) PolicyStatInfo {
func() {
glog.V(4).Infof("read lock update policy %s", policyName)
psa.mux.RLock()
}()
defer func() {
glog.V(4).Infof("read Unlock update policy %s", policyName)
psa.mux.RUnlock()
}()
glog.V(4).Infof("read stats for policy %s", policyName)
return psa.policyData[policyName]
}
//RemovePolicyStats rmves policy stats records
func (psa *PolicyStatusAggregator) RemovePolicyStats(policyName string) {
func() {
glog.V(4).Infof("write lock update policy %s", policyName)
psa.mux.Lock()
}()
defer func() {
glog.V(4).Infof("write Unlock update policy %s", policyName)
psa.mux.Unlock()
}()
glog.V(4).Infof("removing stats for policy %s", policyName)
delete(psa.policyData, policyName)
}
//PolicyStatusInterface provides methods to modify policyStatus
type PolicyStatusInterface interface {
SendStat(stat PolicyStat)
// UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error
}
//PolicyStat stored stats for policy
type PolicyStat struct {
PolicyName string
Stats PolicyStatInfo
}
type PolicyStatInfo struct {
MutationExecutionTime time.Duration
ValidationExecutionTime time.Duration
GenerationExecutionTime time.Duration
RulesAppliedCount int
ResourceBlocked int
}
//SendStat sends the stat information for aggregation
func (psa *PolicyStatusAggregator) SendStat(stat PolicyStat) {
glog.V(4).Infof("sending policy stats: %v", stat)
// Send over channel
psa.ch <- stat
}
//GetPolicyStatusAggregator returns interface to send policy status stats
func (pc *PolicyController) GetPolicyStatusAggregator() PolicyStatusInterface {
return pc.statusAggregator
}

View file

@ -238,13 +238,13 @@ func (pvc *PolicyViolationController) syncActiveResource(curPv *kyverno.PolicyVi
return err
}
glog.V(4).Infof("removing policy violation %s as the corresponding resource %s/%s/%s does not exist anymore", curPv.Name, rspec.Kind, rspec.Namespace, rspec.Name)
return nil
}
if err != nil {
glog.V(4).Infof("error while retrieved resource %s/%s/%s: %v", rspec.Kind, rspec.Namespace, rspec.Name, err)
return err
}
//TODO- if the policy is not present, remove the policy violation
return nil
}

View file

@ -86,3 +86,11 @@ func NewKubeClient(config *rest.Config) (kubernetes.Interface, error) {
}
return kclient, nil
}
//Btoi converts boolean to int
func Btoi(b bool) int {
if b {
return 1
}
return 0
}

View file

@ -4,6 +4,7 @@ import (
"github.com/golang/glog"
engine "github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/info"
policyctr "github.com/nirmata/kyverno/pkg/policy"
"github.com/nirmata/kyverno/pkg/utils"
v1beta1 "k8s.io/api/admission/v1beta1"
"k8s.io/apimachinery/pkg/labels"
@ -14,6 +15,23 @@ import (
func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool, engine.EngineResponse) {
var patches [][]byte
var policyInfos []info.PolicyInfo
var policyStats []policyctr.PolicyStat
// gather stats from the engine response
gatherStat := func(policyName string, er engine.EngineResponse) {
ps := policyctr.PolicyStat{}
ps.PolicyName = policyName
ps.Stats.MutationExecutionTime = er.ExecutionTime
ps.Stats.RulesAppliedCount = er.RulesAppliedCount
policyStats = append(policyStats, ps)
}
// send stats for aggregation
sendStat := func(blocked bool) {
for _, stat := range policyStats {
stat.Stats.ResourceBlocked = utils.Btoi(blocked)
//SEND
ws.policyStatus.SendStat(stat)
}
}
glog.V(5).Infof("Receive request in mutating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation)
@ -54,6 +72,10 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool
engineResponse = engine.Mutate(*policy, *resource)
policyInfo.AddRuleInfos(engineResponse.RuleInfos)
// Gather policy application statistics
gatherStat(policy.Name, engineResponse)
// ps := policyctr.NewPolicyStat(policy.Name, engineResponse.ExecutionTime, nil, engineResponse.RulesAppliedCount)
if !policyInfo.IsSuccessful() {
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName())
@ -67,6 +89,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool
patches = append(patches, engineResponse.Patches...)
policyInfos = append(policyInfos, policyInfo)
glog.V(4).Infof("Mutation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName())
}
// ADD ANNOTATIONS
@ -80,11 +103,14 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) (bool
}
ok, msg := isAdmSuccesful(policyInfos)
// Send policy engine Stats
if ok {
sendStat(false)
engineResponse.Patches = patches
return true, engineResponse
}
sendStat(true)
glog.Errorf("Failed to mutate the resource: %s\n", msg)
return false, engineResponse
}

View file

@ -19,6 +19,7 @@ import (
"github.com/nirmata/kyverno/pkg/config"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/event"
"github.com/nirmata/kyverno/pkg/policy"
tlsutils "github.com/nirmata/kyverno/pkg/tls"
"github.com/nirmata/kyverno/pkg/utils"
v1beta1 "k8s.io/api/admission/v1beta1"
@ -37,7 +38,9 @@ type WebhookServer struct {
pvListerSynced cache.InformerSynced
eventGen event.Interface
webhookRegistrationClient *WebhookRegistrationClient
filterK8Resources []utils.K8Resource
// API to send policy stats for aggregation
policyStatus policy.PolicyStatusInterface
filterK8Resources []utils.K8Resource
}
// NewWebhookServer creates new instance of WebhookServer accordingly to given configuration
@ -50,6 +53,7 @@ func NewWebhookServer(
pvInormer kyvernoinformer.PolicyViolationInformer,
eventGen event.Interface,
webhookRegistrationClient *WebhookRegistrationClient,
policyStatus policy.PolicyStatusInterface,
filterK8Resources string) (*WebhookServer, error) {
if tlsPair == nil {
@ -73,6 +77,7 @@ func NewWebhookServer(
pvListerSynced: pInformer.Informer().HasSynced,
eventGen: eventGen,
webhookRegistrationClient: webhookRegistrationClient,
policyStatus: policyStatus,
filterK8Resources: utils.ParseKinds(filterK8Resources),
}
mux := http.NewServeMux()

View file

@ -4,6 +4,7 @@ import (
"github.com/golang/glog"
engine "github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/info"
policyctr "github.com/nirmata/kyverno/pkg/policy"
"github.com/nirmata/kyverno/pkg/policyviolation"
"github.com/nirmata/kyverno/pkg/utils"
v1beta1 "k8s.io/api/admission/v1beta1"
@ -17,6 +18,23 @@ import (
// If there are no errors in validating rule we apply generation rules
func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, resource unstructured.Unstructured) *v1beta1.AdmissionResponse {
var policyInfos []info.PolicyInfo
var policyStats []policyctr.PolicyStat
// gather stats from the engine response
gatherStat := func(policyName string, er engine.EngineResponse) {
ps := policyctr.PolicyStat{}
ps.PolicyName = policyName
ps.Stats.ValidationExecutionTime = er.ExecutionTime
ps.Stats.RulesAppliedCount = er.RulesAppliedCount
policyStats = append(policyStats, ps)
}
// send stats for aggregation
sendStat := func(blocked bool) {
for _, stat := range policyStats {
stat.Stats.ResourceBlocked = utils.Btoi(blocked)
//SEND
ws.policyStatus.SendStat(stat)
}
}
glog.V(5).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation)
@ -55,6 +73,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res
if len(engineResponse.RuleInfos) == 0 {
continue
}
gatherStat(policy.Name, engineResponse)
if len(engineResponse.RuleInfos) > 0 {
glog.V(4).Infof("Validation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName())
@ -87,6 +106,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res
// Even if one the policy being applied
ok, msg := isAdmSuccesful(policyInfos)
if !ok && toBlock(policyInfos) {
sendStat(true)
return &v1beta1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
@ -98,6 +118,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, res
// ADD POLICY VIOLATIONS
policyviolation.GeneratePolicyViolations(ws.pvListerSynced, ws.pvLister, ws.kyvernoClient, policyInfos)
sendStat(false)
return &v1beta1.AdmissionResponse{
Allowed: true,
}