From 085856baa102f4ae52ef1975bb9ef45859466c03 Mon Sep 17 00:00:00 2001
From: Shivkumar Dudhani <shivkumar@nirmata.com>
Date: Thu, 26 Dec 2019 11:50:41 -0800
Subject: [PATCH] add event source and format event messages (#565)

---
 pkg/engine/response.go  |  5 +++
 pkg/event/controller.go | 71 ++++++++++++++++++++++-------------------
 pkg/event/source.go     | 21 ++++++++++++
 pkg/event/util.go       |  5 ++-
 pkg/namespace/report.go |  2 ++
 pkg/policy/apply.go     | 27 ++++++++++++++--
 pkg/policy/report.go    |  2 ++
 pkg/webhooks/report.go  | 18 +++++++----
 8 files changed, 106 insertions(+), 45 deletions(-)
 create mode 100644 pkg/event/source.go

diff --git a/pkg/engine/response.go b/pkg/engine/response.go
index ae488ca1c5..bb2e9bb7f9 100644
--- a/pkg/engine/response.go
+++ b/pkg/engine/response.go
@@ -38,6 +38,11 @@ type ResourceSpec struct {
 	Name       string `json:"name"`
 }
 
+//GetKey returns the key
+func (rs ResourceSpec) GetKey() string {
+	return rs.Kind + "/" + rs.Namespace + "/" + rs.Name
+}
+
 //PolicyStats stores statistics for the single policy application
 type PolicyStats struct {
 	// time required to process the policy rules on a resource
diff --git a/pkg/event/controller.go b/pkg/event/controller.go
index 144a6f11d7..6a36346c3f 100644
--- a/pkg/event/controller.go
+++ b/pkg/event/controller.go
@@ -26,8 +26,14 @@ type Generator struct {
 	pLister kyvernolister.ClusterPolicyLister
 	// returns true if the cluster policy store has been synced at least once
 	pSynced  cache.InformerSynced
+  // queue to store event generation requests
 	queue    workqueue.RateLimitingInterface
-	recorder record.EventRecorder
+	// events generated at policy controller
+	policyCtrRecorder record.EventRecorder
+	// events generated at admission control
+	admissionCtrRecorder record.EventRecorder
+	// events generated at namespaced policy controller to process 'generate' rule
+	genPolicyRecorder record.EventRecorder
 }
 
 //Interface to generate event
@@ -39,17 +45,19 @@ type Interface interface {
 func NewEventGenerator(client *client.Client, pInformer kyvernoinformer.ClusterPolicyInformer) *Generator {
 
 	gen := Generator{
-		client:   client,
-		pLister:  pInformer.Lister(),
-		queue:    workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
-		pSynced:  pInformer.Informer().HasSynced,
-		recorder: initRecorder(client),
-	}
+		client:               client,
+		pLister:              pInformer.Lister(),
+		queue:                workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
+		pSynced:              pInformer.Informer().HasSynced,
+		policyCtrRecorder:    initRecorder(client, PolicyController),
+		admissionCtrRecorder: initRecorder(client, AdmissionController),
+		genPolicyRecorder:    initRecorder(client, GeneratePolicyController),
 
+	}
 	return &gen
 }
 
-func initRecorder(client *client.Client) record.EventRecorder {
+func initRecorder(client *client.Client, eventSource Source) record.EventRecorder {
 	// Initliaze Event Broadcaster
 	err := scheme.AddToScheme(scheme.Scheme)
 	if err != nil {
@@ -68,7 +76,7 @@ func initRecorder(client *client.Client) record.EventRecorder {
 			Interface: eventInterface})
 	recorder := eventBroadcaster.NewRecorder(
 		scheme.Scheme,
-		v1.EventSource{Component: eventSource})
+		v1.EventSource{Component: eventSource.String()})
 	return recorder
 }
 
@@ -113,7 +121,7 @@ func (gen *Generator) handleErr(err error, key interface{}) {
 	}
 	// This controller retries if something goes wrong. After that, it stops trying.
 	if gen.queue.NumRequeues(key) < workQueueRetryLimit {
-		glog.Warningf("Error syncing events %v: %v", key, err)
+		glog.Warningf("Error syncing events %v(re-queuing request, the resource might not have been created yet): %v", key, err)
 		// Re-enqueue the key rate limited. Based on the rate limiter on the
 		// queue and the re-enqueue history, the key will be processed later again.
 		gen.queue.AddRateLimited(key)
@@ -159,47 +167,45 @@ func (gen *Generator) syncHandler(key Info) error {
 		//TODO: policy is clustered resource so wont need namespace
 		robj, err = gen.pLister.Get(key.Name)
 		if err != nil {
-			glog.Errorf("Error creating event: unable to get policy %s, will retry ", key.Name)
+			glog.V(4).Infof("Error creating event: unable to get policy %s, will retry ", key.Name)
 			return err
 		}
 	default:
 		robj, err = gen.client.GetResource(key.Kind, key.Namespace, key.Name)
 		if err != nil {
-			glog.Errorf("Error creating event: unable to get resource %s, %s, will retry ", key.Kind, key.Namespace+"/"+key.Name)
+			glog.V(4).Infof("Error creating event: unable to get resource %s, %s, will retry ", key.Kind, key.Namespace+"/"+key.Name)
 			return err
 		}
 	}
 
+	// set the event type based on reason
+	eventType := v1.EventTypeWarning
 	if key.Reason == PolicyApplied.String() {
-		gen.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message)
-	} else {
-		gen.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message)
+		eventType = v1.EventTypeNormal
+	}
+
+	// based on the source of event generation, use different event recorders
+	switch key.Source {
+	case AdmissionController:
+		gen.admissionCtrRecorder.Event(robj, eventType, key.Reason, key.Message)
+	case PolicyController:
+		gen.policyCtrRecorder.Event(robj, eventType, key.Reason, key.Message)
+	case GeneratePolicyController:
+		gen.genPolicyRecorder.Event(robj, eventType, key.Reason, key.Message)
+	default:
+		glog.Info("info.source not defined for the event generator request")
 	}
 	return nil
 }
 
-//TODO: check if we need this ?
-//NewEvent returns a new event
-func NewEvent(rkind string, rnamespace string, rname string, reason Reason, message MsgKey, args ...interface{}) *Info {
-	msgText, err := getEventMsg(message, args...)
-	if err != nil {
-		glog.Error(err)
-	}
-	return &Info{
-		Kind:      rkind,
-		Name:      rname,
-		Namespace: rnamespace,
-		Reason:    reason.String(),
-		Message:   msgText,
-	}
-}
-
-func NewEventNew(
+//NewEvent builds a event creation request
+func NewEvent(
 	rkind,
 	rapiVersion,
 	rnamespace,
 	rname,
 	reason string,
+	source Source,
 	message MsgKey,
 	args ...interface{}) Info {
 	msgText, err := getEventMsg(message, args...)
@@ -211,6 +217,7 @@ func NewEventNew(
 		Name:      rname,
 		Namespace: rnamespace,
 		Reason:    reason,
+		Source:    source,
 		Message:   msgText,
 	}
 }
diff --git a/pkg/event/source.go b/pkg/event/source.go
new file mode 100644
index 0000000000..7ee1bac38b
--- /dev/null
+++ b/pkg/event/source.go
@@ -0,0 +1,21 @@
+package event
+
+//Source of event generation
+type Source int
+
+const (
+	// AdmissionController : event generated in admission-control webhook
+	AdmissionController Source = iota
+	// PolicyController : event generated in policy-controller
+	PolicyController
+	// GeneratePolicyController : event generated in generate policyController
+	GeneratePolicyController
+)
+
+func (s Source) String() string {
+	return [...]string{
+		"admission-controller",
+		"policy-controller",
+		"generate-policy-controller",
+	}[s]
+}
diff --git a/pkg/event/util.go b/pkg/event/util.go
index a2cab76e7c..677e79d986 100644
--- a/pkg/event/util.go
+++ b/pkg/event/util.go
@@ -1,8 +1,6 @@
 package event
 
-const eventSource = "policy-controller"
-
-const eventWorkQueueName = "policy-controller-events"
+const eventWorkQueueName = "kyverno-events"
 
 const eventWorkerThreadCount = 1
 
@@ -15,4 +13,5 @@ type Info struct {
 	Namespace string
 	Reason    string
 	Message   string
+	Source    Source
 }
diff --git a/pkg/namespace/report.go b/pkg/namespace/report.go
index 592d2e42cd..c416263944 100644
--- a/pkg/namespace/report.go
+++ b/pkg/namespace/report.go
@@ -89,6 +89,7 @@ func generateEventsPerEr(er engine.EngineResponse) []event.Info {
 		e.Namespace = "" // event generate on namespace resource
 		e.Name = er.PolicyResponse.Resource.Name
 		e.Reason = "Failure"
+		e.Source = event.GeneratePolicyController
 		e.Message = fmt.Sprintf("policy '%s' (%s) rule '%s' not satisfied. %v", er.PolicyResponse.Policy, rule.Type, rule.Name, rule.Message)
 		eventInfos = append(eventInfos, e)
 	}
@@ -102,6 +103,7 @@ func generateEventsPerEr(er engine.EngineResponse) []event.Info {
 	e.Namespace = ""
 	e.Name = er.PolicyResponse.Policy
 	e.Reason = "Failure"
+	e.Source = event.GeneratePolicyController
 	e.Message = fmt.Sprintf("policy '%s' rules '%v' on resource '%s/%s/%s' not stasified", er.PolicyResponse.Policy, er.GetFailedRules(), er.PolicyResponse.Resource.Kind, er.PolicyResponse.Resource.Namespace, er.PolicyResponse.Resource.Name)
 	return eventInfos
 }
diff --git a/pkg/policy/apply.go b/pkg/policy/apply.go
index 58794aacd9..5d0882f962 100644
--- a/pkg/policy/apply.go
+++ b/pkg/policy/apply.go
@@ -1,8 +1,10 @@
 package policy
 
 import (
+	"encoding/json"
 	"fmt"
 	"reflect"
+	"strings"
 	"time"
 
 	jsonpatch "github.com/evanphx/json-patch"
@@ -108,7 +110,6 @@ func getFailedOverallRuleInfo(resource unstructured.Unstructured, engineResponse
 		if len(rule.Patches) == 0 {
 			continue
 		}
-
 		patch, err := jsonpatch.DecodePatch(utils.JoinPatches(rule.Patches))
 		if err != nil {
 			glog.V(4).Infof("unable to decode patch %s: %v", rule.Patches, err)
@@ -121,12 +122,32 @@ func getFailedOverallRuleInfo(resource unstructured.Unstructured, engineResponse
 			glog.V(4).Infof("unable to apply patch %s: %v", rule.Patches, err)
 			return engine.EngineResponse{}, err
 		}
-
 		if !jsonpatch.Equal(patchedResource, rawResource) {
 			glog.V(4).Infof("policy %s rule %s condition not satisifed by existing resource", engineResponse.PolicyResponse.Policy, rule.Name)
 			engineResponse.PolicyResponse.Rules[index].Success = false
-			engineResponse.PolicyResponse.Rules[index].Message = fmt.Sprintf("rule not satisfied by existing resource.")
+			engineResponse.PolicyResponse.Rules[index].Message = fmt.Sprintf("mutation json patches not found at resource path %s", extractPatchPath(rule.Patches))
 		}
 	}
 	return engineResponse, nil
 }
+
+type jsonPatch struct {
+	Op    string      `json:"op"`
+	Path  string      `json:"path"`
+	Value interface{} `json:"value"`
+}
+
+func extractPatchPath(patches [][]byte) string {
+	var resultPath []string
+	// extract the patch path and value
+	for _, patch := range patches {
+		glog.V(4).Infof("expected json patch not found in resource: %s", string(patch))
+		var data jsonPatch
+		if err := json.Unmarshal(patch, &data); err != nil {
+			glog.V(4).Infof("Failed to decode the generated patch %v: Error %v", string(patch), err)
+			continue
+		}
+		resultPath = append(resultPath, data.Path)
+	}
+	return strings.Join(resultPath, ";")
+}
diff --git a/pkg/policy/report.go b/pkg/policy/report.go
index 3a562aac01..5ac4e22792 100644
--- a/pkg/policy/report.go
+++ b/pkg/policy/report.go
@@ -109,6 +109,7 @@ func generateEventsPerEr(er engine.EngineResponse) []event.Info {
 		e.Namespace = er.PolicyResponse.Resource.Namespace
 		e.Name = er.PolicyResponse.Resource.Name
 		e.Reason = event.PolicyViolation.String()
+		e.Source = event.PolicyController
 		e.Message = fmt.Sprintf("policy '%s' (%s) rule '%s' not satisfied. %v", er.PolicyResponse.Policy, rule.Type, rule.Name, rule.Message)
 		eventInfos = append(eventInfos, e)
 	}
@@ -123,6 +124,7 @@ func generateEventsPerEr(er engine.EngineResponse) []event.Info {
 	e.Namespace = ""
 	e.Name = er.PolicyResponse.Policy
 	e.Reason = event.PolicyViolation.String()
+	e.Source = event.PolicyController
 	e.Message = fmt.Sprintf("policy '%s' rules '%v' not satisfied on resource '%s/%s/%s'", er.PolicyResponse.Policy, er.GetFailedRules(), er.PolicyResponse.Resource.Kind, er.PolicyResponse.Resource.Namespace, er.PolicyResponse.Resource.Name)
 	eventInfos = append(eventInfos, e)
 	return eventInfos
diff --git a/pkg/webhooks/report.go b/pkg/webhooks/report.go
index febead68da..54ab9683a1 100644
--- a/pkg/webhooks/report.go
+++ b/pkg/webhooks/report.go
@@ -32,12 +32,13 @@ func generateEvents(engineResponses []engine.EngineResponse, onUpdate bool) []ev
 				var e event.Info
 				// UPDATE
 				// event on resource
-				e = event.NewEventNew(
+				e = event.NewEvent(
 					er.PolicyResponse.Resource.Kind,
 					er.PolicyResponse.Resource.APIVersion,
 					er.PolicyResponse.Resource.Namespace,
 					er.PolicyResponse.Resource.Name,
 					reason.String(),
+					event.AdmissionController,
 					event.FPolicyApplyBlockUpdate,
 					filedRulesStr,
 					er.PolicyResponse.Policy,
@@ -46,14 +47,15 @@ func generateEvents(engineResponses []engine.EngineResponse, onUpdate bool) []ev
 				events = append(events, e)
 
 				// event on policy
-				e = event.NewEventNew(
+				e = event.NewEvent(
 					"ClusterPolicy",
 					kyverno.SchemeGroupVersion.String(),
 					"",
 					er.PolicyResponse.Policy,
 					reason.String(),
+					event.AdmissionController,
 					event.FPolicyBlockResourceUpdate,
-					er.PolicyResponse.Resource.Namespace+"/"+er.PolicyResponse.Resource.Name,
+					er.PolicyResponse.Resource.GetKey(),
 					filedRulesStr,
 				)
 				glog.V(4).Infof("UPDATE event on policy %s", er.PolicyResponse.Policy)
@@ -62,14 +64,15 @@ func generateEvents(engineResponses []engine.EngineResponse, onUpdate bool) []ev
 			} else {
 				// CREATE
 				// event on policy
-				e := event.NewEventNew(
+				e := event.NewEvent(
 					"ClusterPolicy",
 					kyverno.SchemeGroupVersion.String(),
 					"",
 					er.PolicyResponse.Policy,
-					event.RequestBlocked.String(),
+					reason.String(),
+					event.AdmissionController,
 					event.FPolicyApplyBlockCreate,
-					er.PolicyResponse.Resource.Namespace+"/"+er.PolicyResponse.Resource.Name,
+					er.PolicyResponse.Resource.GetKey(),
 					filedRulesStr,
 				)
 				glog.V(4).Infof("CREATE event on policy %s", er.PolicyResponse.Policy)
@@ -85,12 +88,13 @@ func generateEvents(engineResponses []engine.EngineResponse, onUpdate bool) []ev
 			successRules := er.GetSuccessRules()
 			successRulesStr := strings.Join(successRules, ";")
 			// event on resource
-			e := event.NewEventNew(
+			e := event.NewEvent(
 				er.PolicyResponse.Resource.Kind,
 				er.PolicyResponse.Resource.APIVersion,
 				er.PolicyResponse.Resource.Namespace,
 				er.PolicyResponse.Resource.Name,
 				event.PolicyApplied.String(),
+				event.AdmissionController,
 				event.SRulesApply,
 				successRulesStr,
 				er.PolicyResponse.Policy,