1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 16:06:56 +00:00
kyverno/pkg/policyreport/reportcontroller.go
shuting 104cd310e8
Cleanup Report Change Requests (#2134)
* clean up RCRs if retry fails

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* cleanup report change request when background scan starts

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* add verb deletecollection to ClusterRole kyverno:customresources

Signed-off-by: Shuting Zhao <shutting06@gmail.com>
2021-07-14 09:57:16 -07:00

659 lines
21 KiB
Go

package policyreport
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
labels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
changerequest "github.com/kyverno/kyverno/pkg/api/kyverno/v1alpha1"
report "github.com/kyverno/kyverno/pkg/api/policyreport/v1alpha1"
kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
requestinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha1"
policyreportinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/policyreport/v1alpha1"
requestlister "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha1"
policyreport "github.com/kyverno/kyverno/pkg/client/listers/policyreport/v1alpha1"
"github.com/kyverno/kyverno/pkg/config"
dclient "github.com/kyverno/kyverno/pkg/dclient"
)
const (
prWorkQueueName = "policy-report-controller"
clusterpolicyreport = "clusterpolicyreport"
)
// ReportGenerator creates policy report
type ReportGenerator struct {
pclient *kyvernoclient.Clientset
dclient *dclient.Client
clusterReportInformer policyreportinformer.ClusterPolicyReportInformer
reportInformer policyreportinformer.PolicyReportInformer
reportReqInformer requestinformer.ReportChangeRequestInformer
clusterReportReqInformer requestinformer.ClusterReportChangeRequestInformer
reportLister policyreport.PolicyReportLister
reportSynced cache.InformerSynced
clusterReportLister policyreport.ClusterPolicyReportLister
clusterReportSynced cache.InformerSynced
reportChangeRequestLister requestlister.ReportChangeRequestLister
reportReqSynced cache.InformerSynced
clusterReportChangeRequestLister requestlister.ClusterReportChangeRequestLister
clusterReportReqSynced cache.InformerSynced
nsLister listerv1.NamespaceLister
nsListerSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
// ReconcileCh sends a signal to policy controller to force the reconciliation of policy report
// if send true, the reports' results will be erased, this is used to recover from the invalid records
ReconcileCh chan bool
log logr.Logger
}
// NewReportGenerator returns a new instance of policy report generator
func NewReportGenerator(
kubeClient kubernetes.Interface,
pclient *kyvernoclient.Clientset,
dclient *dclient.Client,
clusterReportInformer policyreportinformer.ClusterPolicyReportInformer,
reportInformer policyreportinformer.PolicyReportInformer,
reportReqInformer requestinformer.ReportChangeRequestInformer,
clusterReportReqInformer requestinformer.ClusterReportChangeRequestInformer,
namespace informers.NamespaceInformer,
log logr.Logger) (*ReportGenerator, error) {
gen := &ReportGenerator{
pclient: pclient,
dclient: dclient,
clusterReportInformer: clusterReportInformer,
reportInformer: reportInformer,
reportReqInformer: reportReqInformer,
clusterReportReqInformer: clusterReportReqInformer,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), prWorkQueueName),
ReconcileCh: make(chan bool, 10),
log: log,
}
gen.clusterReportLister = clusterReportInformer.Lister()
gen.clusterReportSynced = clusterReportInformer.Informer().HasSynced
gen.reportLister = reportInformer.Lister()
gen.reportSynced = reportInformer.Informer().HasSynced
gen.clusterReportChangeRequestLister = clusterReportReqInformer.Lister()
gen.clusterReportReqSynced = clusterReportReqInformer.Informer().HasSynced
gen.reportChangeRequestLister = reportReqInformer.Lister()
gen.reportReqSynced = reportReqInformer.Informer().HasSynced
gen.nsLister = namespace.Lister()
gen.nsListerSynced = namespace.Informer().HasSynced
return gen, nil
}
const deletedPolicyKey string = "deletedpolicy"
// the key of queue can be
// - <namespace name> for the resource
// - "" for cluster wide resource
// - "deletedpolicy/policyName/ruleName(optional)" for a deleted policy or rule
func generateCacheKey(changeRequest interface{}) string {
if request, ok := changeRequest.(*changerequest.ReportChangeRequest); ok {
label := request.GetLabels()
policy := label[deletedLabelPolicy]
rule := label[deletedLabelRule]
if rule != "" || policy != "" {
return strings.Join([]string{deletedPolicyKey, policy, rule}, "/")
}
ns := label[resourceLabelNamespace]
if ns == "" {
ns = "default"
}
return ns
} else if request, ok := changeRequest.(*changerequest.ClusterReportChangeRequest); ok {
label := request.GetLabels()
policy := label[deletedLabelPolicy]
rule := label[deletedLabelRule]
if rule != "" || policy != "" {
return strings.Join([]string{deletedPolicyKey, policy, rule}, "/")
}
return ""
}
return ""
}
func (g *ReportGenerator) addReportChangeRequest(obj interface{}) {
key := generateCacheKey(obj)
g.queue.Add(key)
}
func (g *ReportGenerator) updateReportChangeRequest(old interface{}, cur interface{}) {
oldReq := old.(*changerequest.ReportChangeRequest)
curReq := cur.(*changerequest.ReportChangeRequest)
if reflect.DeepEqual(oldReq.Results, curReq.Results) {
return
}
key := generateCacheKey(cur)
g.queue.Add(key)
}
func (g *ReportGenerator) addClusterReportChangeRequest(obj interface{}) {
key := generateCacheKey(obj)
g.queue.Add(key)
}
func (g *ReportGenerator) updateClusterReportChangeRequest(old interface{}, cur interface{}) {
oldReq := old.(*changerequest.ClusterReportChangeRequest)
curReq := cur.(*changerequest.ClusterReportChangeRequest)
if reflect.DeepEqual(oldReq.Results, curReq.Results) {
return
}
g.queue.Add("")
}
func (g *ReportGenerator) deletePolicyReport(obj interface{}) {
report := obj.(*report.PolicyReport)
g.log.V(2).Info("PolicyReport deleted", "name", report.GetName())
g.ReconcileCh <- false
}
func (g *ReportGenerator) deleteClusterPolicyReport(obj interface{}) {
g.log.V(2).Info("ClusterPolicyReport deleted")
g.ReconcileCh <- false
}
// Run starts the workers
func (g *ReportGenerator) Run(workers int, stopCh <-chan struct{}) {
logger := g.log
defer utilruntime.HandleCrash()
defer g.queue.ShutDown()
logger.Info("start")
defer logger.Info("shutting down")
if !cache.WaitForCacheSync(stopCh, g.reportReqSynced, g.clusterReportReqSynced, g.reportSynced, g.clusterReportSynced, g.nsListerSynced) {
logger.Info("failed to sync informer cache")
}
g.reportReqInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: g.addReportChangeRequest,
UpdateFunc: g.updateReportChangeRequest,
})
g.clusterReportReqInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: g.addClusterReportChangeRequest,
UpdateFunc: g.updateClusterReportChangeRequest,
})
g.reportInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
DeleteFunc: g.deletePolicyReport,
})
g.clusterReportInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
DeleteFunc: g.deleteClusterPolicyReport,
})
for i := 0; i < workers; i++ {
go wait.Until(g.runWorker, time.Second, stopCh)
}
<-stopCh
}
func (g *ReportGenerator) runWorker() {
for g.processNextWorkItem() {
}
}
func (g *ReportGenerator) processNextWorkItem() bool {
key, shutdown := g.queue.Get()
if shutdown {
return false
}
defer g.queue.Done(key)
keyStr, ok := key.(string)
if !ok {
g.queue.Forget(key)
g.log.Info("incorrect type; expecting type 'string'", "obj", key)
return true
}
aggregatedRequests, err := g.syncHandler(keyStr)
g.handleErr(err, key, aggregatedRequests)
return true
}
func (g *ReportGenerator) handleErr(err error, key interface{}, aggregatedRequests interface{}) {
logger := g.log
if err == nil {
g.queue.Forget(key)
return
}
// retires requests if there is error
if g.queue.NumRequeues(key) < workQueueRetryLimit {
logger.V(3).Info("retrying policy report", "key", key, "error", err.Error())
g.queue.AddRateLimited(key)
return
}
logger.Error(err, "failed to process policy report", "key", key)
g.queue.Forget(key)
if aggregatedRequests != nil {
g.cleanupReportRequests(aggregatedRequests)
}
}
// syncHandler reconciles clusterPolicyReport if namespace == ""
// otherwise it updates policyReport
func (g *ReportGenerator) syncHandler(key string) (aggregatedRequests interface{}, err error) {
g.log.V(4).Info("syncing policy report", "key", key)
if policy, rule, ok := isDeletedPolicyKey(key); ok {
return g.removePolicyEntryFromReport(policy, rule)
}
namespace := key
new, aggregatedRequests, err := g.aggregateReports(namespace)
if err != nil {
return aggregatedRequests, fmt.Errorf("failed to aggregate reportChangeRequest results %v", err)
}
var old interface{}
if old, err = g.createReportIfNotPresent(namespace, new, aggregatedRequests); err != nil {
return aggregatedRequests, err
}
if old == nil {
g.cleanupReportRequests(aggregatedRequests)
return nil, nil
}
if err := g.updateReport(old, new, aggregatedRequests); err != nil {
return aggregatedRequests, err
}
g.cleanupReportRequests(aggregatedRequests)
return nil, nil
}
// createReportIfNotPresent creates cluster / policyReport if not present
// return the existing report if exist
func (g *ReportGenerator) createReportIfNotPresent(namespace string, new *unstructured.Unstructured, aggregatedRequests interface{}) (report interface{}, err error) {
log := g.log.WithName("createReportIfNotPresent")
obj, hasDuplicate, err := updateResults(new.UnstructuredContent(), new.UnstructuredContent(), nil)
if hasDuplicate && err != nil {
g.log.Error(err, "failed to remove duplicate results", "policy report", new.GetName())
} else {
new.Object = obj
}
if namespace != "" {
ns, err := g.nsLister.Get(namespace)
if err != nil {
if apierrors.IsNotFound(err) {
return nil, nil
}
return nil, fmt.Errorf("failed to fetch namespace: %v", err)
}
if ns.GetDeletionTimestamp() != nil {
return nil, nil
}
report, err = g.reportLister.PolicyReports(namespace).Get(generatePolicyReportName((namespace)))
if err != nil {
if apierrors.IsNotFound(err) && new != nil {
if _, err := g.dclient.CreateResource(new.GetAPIVersion(), new.GetKind(), new.GetNamespace(), new, false); err != nil {
return nil, fmt.Errorf("failed to create policyReport: %v", err)
}
log.V(2).Info("successfully created policyReport", "namespace", new.GetNamespace(), "name", new.GetName())
g.cleanupReportRequests(aggregatedRequests)
return nil, nil
}
return nil, fmt.Errorf("unable to get policyReport: %v", err)
}
} else {
report, err = g.clusterReportLister.Get(generatePolicyReportName((namespace)))
if err != nil {
if apierrors.IsNotFound(err) {
if new != nil {
if _, err := g.dclient.CreateResource(new.GetAPIVersion(), new.GetKind(), new.GetNamespace(), new, false); err != nil {
return nil, fmt.Errorf("failed to create ClusterPolicyReport: %v", err)
}
log.V(2).Info("successfully created ClusterPolicyReport")
g.cleanupReportRequests(aggregatedRequests)
return nil, nil
}
return nil, nil
}
return nil, fmt.Errorf("unable to get ClusterPolicyReport: %v", err)
}
}
return report, nil
}
func (g *ReportGenerator) removePolicyEntryFromReport(policyName, ruleName string) (aggregatedRequests interface{}, err error) {
if err := g.removeFromClusterPolicyReport(policyName, ruleName); err != nil {
return nil, err
}
if err := g.removeFromPolicyReport(policyName, ruleName); err != nil {
return nil, err
}
labelset := labels.Set(map[string]string{deletedLabelPolicy: policyName})
if ruleName != "" {
labelset = labels.Set(map[string]string{
deletedLabelPolicy: policyName,
deletedLabelRule: ruleName,
})
}
aggregatedRequests, err = g.reportChangeRequestLister.ReportChangeRequests(config.KyvernoNamespace).List(labels.SelectorFromSet(labelset))
if err != nil {
return aggregatedRequests, err
}
g.cleanupReportRequests(aggregatedRequests)
return nil, nil
}
func (g *ReportGenerator) removeFromClusterPolicyReport(policyName, ruleName string) error {
cpolrs, err := g.clusterReportLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("failed to list clusterPolicyReport %v", err)
}
for _, cpolr := range cpolrs {
newRes := []*report.PolicyReportResult{}
for _, result := range cpolr.Results {
if ruleName != "" && result.Rule == ruleName && result.Policy == policyName {
continue
} else if ruleName == "" && result.Policy == policyName {
continue
}
newRes = append(newRes, result)
}
cpolr.Results = newRes
cpolr.Summary = calculateSummary(newRes)
gv := report.SchemeGroupVersion
cpolr.SetGroupVersionKind(schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: "ClusterPolicyReport"})
if _, err := g.dclient.UpdateResource("", "ClusterPolicyReport", "", cpolr, false); err != nil {
return fmt.Errorf("failed to update clusterPolicyReport %s %v", cpolr.Name, err)
}
}
return nil
}
func (g *ReportGenerator) removeFromPolicyReport(policyName, ruleName string) error {
namespaces, err := g.dclient.ListResource("", "Namespace", "", nil)
if err != nil {
return fmt.Errorf("unable to list namespace %v", err)
}
policyReports := []*report.PolicyReport{}
for _, ns := range namespaces.Items {
reports, err := g.reportLister.PolicyReports(ns.GetName()).List(labels.Everything())
if err != nil {
return fmt.Errorf("unable to list policyReport for namespace %s %v", ns.GetName(), err)
}
policyReports = append(policyReports, reports...)
}
for _, r := range policyReports {
newRes := []*report.PolicyReportResult{}
for _, result := range r.Results {
if ruleName != "" && result.Rule == ruleName && result.Policy == policyName {
continue
} else if ruleName == "" && result.Policy == policyName {
continue
}
newRes = append(newRes, result)
}
r.Results = newRes
r.Summary = calculateSummary(newRes)
gv := report.SchemeGroupVersion
gvk := schema.GroupVersionKind{Group: gv.Group, Version: gv.Version, Kind: "PolicyReport"}
r.SetGroupVersionKind(gvk)
if _, err := g.dclient.UpdateResource("", "PolicyReport", r.GetNamespace(), r, false); err != nil {
return fmt.Errorf("failed to update PolicyReport %s %v", r.GetName(), err)
}
}
return nil
}
// aggregateReports aggregates cluster / report change requests to a policy report
func (g *ReportGenerator) aggregateReports(namespace string) (
report *unstructured.Unstructured, aggregatedRequests interface{}, err error) {
if namespace == "" {
requests, err := g.clusterReportChangeRequestLister.List(labels.Everything())
if err != nil {
return nil, nil, fmt.Errorf("unable to list ClusterReportChangeRequests within: %v", err)
}
if report, aggregatedRequests, err = mergeRequests(nil, requests); err != nil {
return nil, nil, fmt.Errorf("unable to merge ClusterReportChangeRequests results: %v", err)
}
} else {
ns, err := g.nsLister.Get(namespace)
if err != nil {
if !apierrors.IsNotFound(err) {
return nil, nil, fmt.Errorf("unable to get namespace %s: %v", namespace, err)
}
// Namespace is deleted, create a fake ns to clean up RCRs
ns = new(v1.Namespace)
ns.SetName(namespace)
now := metav1.Now()
ns.SetDeletionTimestamp(&now)
}
selector := labels.SelectorFromSet(labels.Set(map[string]string{resourceLabelNamespace: namespace}))
requests, err := g.reportChangeRequestLister.ReportChangeRequests(config.KyvernoNamespace).List(selector)
if err != nil {
return nil, nil, fmt.Errorf("unable to list reportChangeRequests within namespace %s: %v", ns, err)
}
if report, aggregatedRequests, err = mergeRequests(ns, requests); err != nil {
return nil, nil, fmt.Errorf("unable to merge results: %v", err)
}
}
return report, aggregatedRequests, nil
}
func mergeRequests(ns *v1.Namespace, requestsGeneral interface{}) (*unstructured.Unstructured, interface{}, error) {
results := []*report.PolicyReportResult{}
if requests, ok := requestsGeneral.([]*changerequest.ClusterReportChangeRequest); ok {
aggregatedRequests := []*changerequest.ClusterReportChangeRequest{}
for _, request := range requests {
if request.GetDeletionTimestamp() != nil {
continue
}
if len(request.Results) != 0 {
results = append(results, request.Results...)
}
aggregatedRequests = append(aggregatedRequests, request)
}
report := &report.ClusterPolicyReport{
Results: results,
Summary: calculateSummary(results),
}
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(report)
if err != nil {
return nil, aggregatedRequests, err
}
req := &unstructured.Unstructured{Object: obj}
setReport(req, nil)
return req, aggregatedRequests, nil
}
if requests, ok := requestsGeneral.([]*changerequest.ReportChangeRequest); ok {
aggregatedRequests := []*changerequest.ReportChangeRequest{}
for _, request := range requests {
if request.GetDeletionTimestamp() != nil {
continue
}
if len(request.Results) != 0 {
results = append(results, request.Results...)
}
aggregatedRequests = append(aggregatedRequests, request)
}
report := &report.PolicyReport{
Results: results,
Summary: calculateSummary(results),
}
obj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(report)
if err != nil {
return nil, aggregatedRequests, err
}
req := &unstructured.Unstructured{Object: obj}
setReport(req, ns)
return req, aggregatedRequests, nil
}
return nil, nil, nil
}
func setReport(reportUnstructured *unstructured.Unstructured, ns *v1.Namespace) {
reportUnstructured.SetAPIVersion(report.SchemeGroupVersion.String())
if ns == nil {
reportUnstructured.SetName(generatePolicyReportName(""))
reportUnstructured.SetKind("ClusterPolicyReport")
return
}
reportUnstructured.SetName(generatePolicyReportName(ns.GetName()))
reportUnstructured.SetNamespace(ns.GetName())
reportUnstructured.SetKind("PolicyReport")
controllerFlag := true
blockOwnerDeletionFlag := true
reportUnstructured.SetOwnerReferences([]metav1.OwnerReference{
{
APIVersion: "v1",
Kind: "Namespace",
Name: ns.GetName(),
UID: ns.GetUID(),
Controller: &controllerFlag,
BlockOwnerDeletion: &blockOwnerDeletionFlag,
},
})
}
func (g *ReportGenerator) updateReport(old interface{}, new *unstructured.Unstructured, aggregatedRequests interface{}) (err error) {
if new == nil {
g.log.V(4).Info("empty report to update")
return nil
}
oldUnstructured := make(map[string]interface{})
if oldTyped, ok := old.(*report.ClusterPolicyReport); ok {
if oldTyped.GetDeletionTimestamp() != nil {
return g.dclient.DeleteResource(oldTyped.APIVersion, "ClusterPolicyReport", oldTyped.Namespace, oldTyped.Name, false)
}
if oldUnstructured, err = runtime.DefaultUnstructuredConverter.ToUnstructured(oldTyped); err != nil {
return fmt.Errorf("unable to convert clusterPolicyReport: %v", err)
}
new.SetUID(oldTyped.GetUID())
new.SetResourceVersion(oldTyped.GetResourceVersion())
} else if oldTyped, ok := old.(*report.PolicyReport); ok {
if oldTyped.GetDeletionTimestamp() != nil {
return g.dclient.DeleteResource(oldTyped.APIVersion, "PolicyReport", oldTyped.Namespace, oldTyped.Name, false)
}
if oldUnstructured, err = runtime.DefaultUnstructuredConverter.ToUnstructured(oldTyped); err != nil {
return fmt.Errorf("unable to convert policyReport: %v", err)
}
new.SetUID(oldTyped.GetUID())
new.SetResourceVersion(oldTyped.GetResourceVersion())
}
obj, _, err := updateResults(oldUnstructured, new.UnstructuredContent(), aggregatedRequests)
if err != nil {
return fmt.Errorf("failed to update results entry: %v", err)
}
new.Object = obj
if !hasResultsChanged(oldUnstructured, new.UnstructuredContent()) {
g.log.V(4).Info("unchanged policy report", "kind", new.GetKind(), "namespace", new.GetNamespace(), "name", new.GetName())
return nil
}
if _, err = g.dclient.UpdateResource(new.GetAPIVersion(), new.GetKind(), new.GetNamespace(), new, false); err != nil {
return fmt.Errorf("failed to update policy report: %v", err)
}
g.log.V(3).Info("successfully updated policy report", "kind", new.GetKind(), "namespace", new.GetNamespace(), "name", new.GetName())
return
}
func (g *ReportGenerator) cleanupReportRequests(requestsGeneral interface{}) {
defer g.log.V(5).Info("successfully cleaned up report requests")
if requests, ok := requestsGeneral.([]*changerequest.ReportChangeRequest); ok {
for _, request := range requests {
if err := g.dclient.DeleteResource(request.APIVersion, "ReportChangeRequest", config.KyvernoNamespace, request.Name, false); err != nil {
if !apierrors.IsNotFound(err) {
g.log.Error(err, "failed to delete report request")
}
}
}
}
if requests, ok := requestsGeneral.([]*changerequest.ClusterReportChangeRequest); ok {
for _, request := range requests {
if err := g.dclient.DeleteResource(request.APIVersion, "ClusterReportChangeRequest", "", request.Name, false); err != nil {
if !apierrors.IsNotFound(err) {
g.log.Error(err, "failed to delete clusterReportChangeRequest")
}
}
}
}
}