mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-07 08:26:53 +00:00
223 lines
5.9 KiB
Go
Executable file
223 lines
5.9 KiB
Go
Executable file
package policyreport
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
policyreportv1alpha12 "github.com/nirmata/kyverno/pkg/api/policyreport/v1alpha1"
|
|
"github.com/nirmata/kyverno/pkg/constant"
|
|
corev1 "k8s.io/api/core/v1"
|
|
k8serror "k8s.io/apimachinery/pkg/api/errors"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/util/workqueue"
|
|
"reflect"
|
|
|
|
"github.com/go-logr/logr"
|
|
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
|
|
policyreportv1alpha1 "github.com/nirmata/kyverno/pkg/client/clientset/versioned/typed/policyreport/v1alpha1"
|
|
policyreportlister "github.com/nirmata/kyverno/pkg/client/listers/policyreport/v1alpha1"
|
|
client "github.com/nirmata/kyverno/pkg/dclient"
|
|
"github.com/nirmata/kyverno/pkg/policystatus"
|
|
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
const clusterWorkQueueName = "policy-report-cluster"
|
|
const clusterWorkQueueRetryLimit = 3
|
|
|
|
//clusterPR ...
|
|
type clusterPR struct {
|
|
// dynamic client
|
|
dclient *client.Client
|
|
// get/list cluster policy report
|
|
cprLister policyreportlister.ClusterPolicyReportLister
|
|
// policy violation interface
|
|
policyreportInterface policyreportv1alpha1.PolicyV1alpha1Interface
|
|
// logger
|
|
log logr.Logger
|
|
// update policy stats with violationCount
|
|
policyStatusListener policystatus.Listener
|
|
|
|
dataStore *dataStore
|
|
|
|
queue workqueue.RateLimitingInterface
|
|
}
|
|
|
|
func newClusterPR(log logr.Logger, dclient *client.Client,
|
|
cprLister policyreportlister.ClusterPolicyReportLister,
|
|
policyreportInterface policyreportv1alpha1.PolicyV1alpha1Interface,
|
|
policyStatus policystatus.Listener,
|
|
) *clusterPR {
|
|
cpv := clusterPR{
|
|
dclient: dclient,
|
|
cprLister: cprLister,
|
|
policyreportInterface: policyreportInterface,
|
|
log: log,
|
|
policyStatusListener: policyStatus,
|
|
|
|
dataStore: newDataStore(),
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), clusterWorkQueueName),
|
|
}
|
|
return &cpv
|
|
}
|
|
|
|
func (cpr *clusterPR) enqueue(info Info) {
|
|
// add to data map
|
|
keyHash := info.toKey()
|
|
// add to
|
|
// queue the key hash
|
|
cpr.dataStore.add(keyHash, info)
|
|
cpr.queue.Add(keyHash)
|
|
}
|
|
|
|
//Add queues a policy violation create request
|
|
func (cpr *clusterPR) Add(infos ...Info) {
|
|
for _, info := range infos {
|
|
cpr.enqueue(info)
|
|
}
|
|
}
|
|
|
|
// Run starts the workers
|
|
func (cpr *clusterPR) Run(workers int, stopCh <-chan struct{}) {
|
|
logger := cpr.log
|
|
defer utilruntime.HandleCrash()
|
|
logger.Info("start")
|
|
defer logger.Info("shutting down")
|
|
|
|
for i := 0; i < workers; i++ {
|
|
go wait.Until(cpr.runWorker, constant.PolicyViolationControllerResync, stopCh)
|
|
}
|
|
<-stopCh
|
|
}
|
|
|
|
func (cpr *clusterPR) runWorker() {
|
|
for cpr.processNextWorkItem() {
|
|
}
|
|
}
|
|
|
|
func (cpr *clusterPR) handleErr(err error, key interface{}) {
|
|
logger := cpr.log
|
|
if err == nil {
|
|
cpr.queue.Forget(key)
|
|
return
|
|
}
|
|
|
|
// retires requests if there is error
|
|
if cpr.queue.NumRequeues(key) < clusterWorkQueueRetryLimit {
|
|
logger.Error(err, "failed to sync policy violation", "key", key)
|
|
// Re-enqueue the key rate limited. Based on the rate limiter on the
|
|
// queue and the re-enqueue history, the key will be processed later again.
|
|
cpr.queue.AddRateLimited(key)
|
|
return
|
|
}
|
|
cpr.queue.Forget(key)
|
|
// remove from data store
|
|
if keyHash, ok := key.(string); ok {
|
|
cpr.dataStore.delete(keyHash)
|
|
}
|
|
logger.Error(err, "dropping key out of the queue", "key", key)
|
|
}
|
|
|
|
func (cpr *clusterPR) processNextWorkItem() bool {
|
|
logger := cpr.log
|
|
obj, shutdown := cpr.queue.Get()
|
|
if shutdown {
|
|
return false
|
|
}
|
|
|
|
err := func(obj interface{}) error {
|
|
defer cpr.queue.Done(obj)
|
|
var keyHash string
|
|
var ok bool
|
|
|
|
if keyHash, ok = obj.(string); !ok {
|
|
cpr.queue.Forget(obj)
|
|
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
|
|
return nil
|
|
}
|
|
|
|
// lookup data store
|
|
info := cpr.dataStore.lookup(keyHash)
|
|
if reflect.DeepEqual(info, Info{}) {
|
|
// empty key
|
|
cpr.queue.Forget(obj)
|
|
logger.Info("empty key")
|
|
return nil
|
|
}
|
|
|
|
err := cpr.syncHandler(info)
|
|
cpr.handleErr(err, obj)
|
|
return nil
|
|
}(obj)
|
|
|
|
if err != nil {
|
|
logger.Error(err, "failed to process item")
|
|
return true
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (cpr *clusterPR) syncHandler(info Info) error {
|
|
logger := cpr.log
|
|
failure := false
|
|
builder := NewPrBuilder()
|
|
|
|
pv := builder.Generate(info)
|
|
|
|
if info.FromSync {
|
|
pv.Annotations = map[string]string{
|
|
"fromSync": "true",
|
|
}
|
|
}
|
|
|
|
// Create Policy Violations
|
|
logger.V(4).Info("creating policy violation", "key", info.toKey())
|
|
if err := cpr.create(pv, ""); err != nil {
|
|
failure = true
|
|
logger.Error(err, "failed to create policy violation")
|
|
}
|
|
|
|
if failure {
|
|
// even if there is a single failure we requeue the request
|
|
return errors.New("Failed to process some policy violations, re-queuing")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cpr *clusterPR) create(pv kyverno.PolicyViolationTemplate, appName string) error {
|
|
reportName := fmt.Sprintf("kyverno-clusterpolicyreport-%s", pv.Spec.Policy)
|
|
clusterpr, err := cpr.policyreportInterface.ClusterPolicyReports().Get(reportName, v1.GetOptions{})
|
|
if err != nil {
|
|
if !k8serror.IsNotFound(err) {
|
|
return err
|
|
}
|
|
clusterpr = &policyreportv1alpha12.ClusterPolicyReport{
|
|
Scope: &corev1.ObjectReference{
|
|
Kind: "Cluster",
|
|
},
|
|
Summary: policyreportv1alpha12.PolicyReportSummary{},
|
|
Results: []*policyreportv1alpha12.PolicyReportResult{},
|
|
}
|
|
labelMap := map[string]string{
|
|
"policy-scope": "cluster",
|
|
}
|
|
clusterpr.SetLabels(labelMap)
|
|
clusterpr.ObjectMeta.Name = reportName
|
|
prObj := NewPolicyReport(nil, clusterpr, &pv, cpr.dclient)
|
|
clusterpr := prObj.CreateClusterPolicyViolationsToClusterPolicyReport()
|
|
|
|
_, err = cpr.policyreportInterface.ClusterPolicyReports().Create(clusterpr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
prObj := NewPolicyReport(nil, clusterpr, &pv, cpr.dclient)
|
|
clusterpr = prObj.CreateClusterPolicyViolationsToClusterPolicyReport()
|
|
|
|
_, err = cpr.policyreportInterface.ClusterPolicyReports().Update(clusterpr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|