1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-07 00:17:13 +00:00
kyverno/pkg/policyreport/clusterpr.go
2020-08-26 18:50:38 +05:30

223 lines
5.9 KiB
Go
Executable file

package policyreport
import (
"errors"
"fmt"
policyreportv1alpha12 "github.com/nirmata/kyverno/pkg/api/policyreport/v1alpha1"
"github.com/nirmata/kyverno/pkg/constant"
corev1 "k8s.io/api/core/v1"
k8serror "k8s.io/apimachinery/pkg/api/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
"reflect"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
policyreportv1alpha1 "github.com/nirmata/kyverno/pkg/client/clientset/versioned/typed/policyreport/v1alpha1"
policyreportlister "github.com/nirmata/kyverno/pkg/client/listers/policyreport/v1alpha1"
client "github.com/nirmata/kyverno/pkg/dclient"
"github.com/nirmata/kyverno/pkg/policystatus"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
const clusterWorkQueueName = "policy-report-cluster"
const clusterWorkQueueRetryLimit = 3
//clusterPR ...
type clusterPR struct {
// dynamic client
dclient *client.Client
// get/list cluster policy report
cprLister policyreportlister.ClusterPolicyReportLister
// policy violation interface
policyreportInterface policyreportv1alpha1.PolicyV1alpha1Interface
// logger
log logr.Logger
// update policy stats with violationCount
policyStatusListener policystatus.Listener
dataStore *dataStore
queue workqueue.RateLimitingInterface
}
func newClusterPR(log logr.Logger, dclient *client.Client,
cprLister policyreportlister.ClusterPolicyReportLister,
policyreportInterface policyreportv1alpha1.PolicyV1alpha1Interface,
policyStatus policystatus.Listener,
) *clusterPR {
cpv := clusterPR{
dclient: dclient,
cprLister: cprLister,
policyreportInterface: policyreportInterface,
log: log,
policyStatusListener: policyStatus,
dataStore: newDataStore(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), clusterWorkQueueName),
}
return &cpv
}
func (cpr *clusterPR) enqueue(info Info) {
// add to data map
keyHash := info.toKey()
// add to
// queue the key hash
cpr.dataStore.add(keyHash, info)
cpr.queue.Add(keyHash)
}
//Add queues a policy violation create request
func (cpr *clusterPR) Add(infos ...Info) {
for _, info := range infos {
cpr.enqueue(info)
}
}
// Run starts the workers
func (cpr *clusterPR) Run(workers int, stopCh <-chan struct{}) {
logger := cpr.log
defer utilruntime.HandleCrash()
logger.Info("start")
defer logger.Info("shutting down")
for i := 0; i < workers; i++ {
go wait.Until(cpr.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
<-stopCh
}
func (cpr *clusterPR) runWorker() {
for cpr.processNextWorkItem() {
}
}
func (cpr *clusterPR) handleErr(err error, key interface{}) {
logger := cpr.log
if err == nil {
cpr.queue.Forget(key)
return
}
// retires requests if there is error
if cpr.queue.NumRequeues(key) < clusterWorkQueueRetryLimit {
logger.Error(err, "failed to sync policy violation", "key", key)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
cpr.queue.AddRateLimited(key)
return
}
cpr.queue.Forget(key)
// remove from data store
if keyHash, ok := key.(string); ok {
cpr.dataStore.delete(keyHash)
}
logger.Error(err, "dropping key out of the queue", "key", key)
}
func (cpr *clusterPR) processNextWorkItem() bool {
logger := cpr.log
obj, shutdown := cpr.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer cpr.queue.Done(obj)
var keyHash string
var ok bool
if keyHash, ok = obj.(string); !ok {
cpr.queue.Forget(obj)
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
return nil
}
// lookup data store
info := cpr.dataStore.lookup(keyHash)
if reflect.DeepEqual(info, Info{}) {
// empty key
cpr.queue.Forget(obj)
logger.Info("empty key")
return nil
}
err := cpr.syncHandler(info)
cpr.handleErr(err, obj)
return nil
}(obj)
if err != nil {
logger.Error(err, "failed to process item")
return true
}
return true
}
func (cpr *clusterPR) syncHandler(info Info) error {
logger := cpr.log
failure := false
builder := newPrBuilder()
pv := builder.generate(info)
if info.FromSync {
pv.Annotations = map[string]string{
"fromSync": "true",
}
}
// Create Policy Violations
logger.V(4).Info("creating policy violation", "key", info.toKey())
if err := cpr.create(pv, ""); err != nil {
failure = true
logger.Error(err, "failed to create policy violation")
}
if failure {
// even if there is a single failure we requeue the request
return errors.New("Failed to process some policy violations, re-queuing")
}
return nil
}
func (cpr *clusterPR) create(pv kyverno.PolicyViolationTemplate, appName string) error {
reportName := fmt.Sprintf("kyverno-clusterpolicyreport-%s", pv.Spec.Policy)
clusterpr, err := cpr.policyreportInterface.ClusterPolicyReports().Get(reportName, v1.GetOptions{})
if err != nil {
if !k8serror.IsNotFound(err) {
return err
}
clusterpr = &policyreportv1alpha12.ClusterPolicyReport{
Scope: &corev1.ObjectReference{
Kind: "Cluster",
},
Summary: policyreportv1alpha12.PolicyReportSummary{},
Results: []*policyreportv1alpha12.PolicyReportResult{},
}
labelMap := map[string]string{
"policy-scope": "cluster",
}
clusterpr.SetLabels(labelMap)
clusterpr.ObjectMeta.Name = reportName
prObj := NewPolicyReport(nil, clusterpr, &pv, cpr.dclient)
clusterpr := prObj.CreateClusterPolicyViolationsToClusterPolicyReport()
_, err = cpr.policyreportInterface.ClusterPolicyReports().Create(clusterpr)
if err != nil {
return err
}
return nil
}
prObj := NewPolicyReport(nil, clusterpr, &pv, cpr.dclient)
clusterpr = prObj.CreateClusterPolicyViolationsToClusterPolicyReport()
_, err = cpr.policyreportInterface.ClusterPolicyReports().Update(clusterpr)
if err != nil {
return err
}
return nil
}