1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-09 09:26:54 +00:00
kyverno/pkg/policyreport/reportrequest.go

324 lines
9.7 KiB
Go
Raw Normal View History

2020-08-26 18:50:38 +05:30
package policyreport
import (
2020-10-23 18:27:55 -07:00
"fmt"
2020-08-26 18:50:38 +05:30
"reflect"
"strconv"
"strings"
"sync"
"github.com/go-logr/logr"
kyverno "github.com/kyverno/kyverno/pkg/api/kyverno/v1"
2020-10-23 18:27:55 -07:00
report "github.com/kyverno/kyverno/pkg/api/policyreport/v1alpha1"
policyreportclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
reportchangerequest "github.com/kyverno/kyverno/pkg/client/clientset/versioned/typed/policyreport/v1alpha1"
policyreportinformer "github.com/kyverno/kyverno/pkg/client/informers/externalversions/policyreport/v1alpha1"
2020-10-22 19:57:28 -07:00
policyreport "github.com/kyverno/kyverno/pkg/client/listers/policyreport/v1alpha1"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/constant"
2020-10-23 18:27:55 -07:00
client "github.com/kyverno/kyverno/pkg/dclient"
dclient "github.com/kyverno/kyverno/pkg/dclient"
"github.com/kyverno/kyverno/pkg/policystatus"
2020-10-23 18:27:55 -07:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2020-08-26 18:50:38 +05:30
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2020-10-23 18:27:55 -07:00
"k8s.io/apimachinery/pkg/runtime"
2020-08-26 18:50:38 +05:30
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
const workQueueName = "report-request-controller"
2020-08-26 18:50:38 +05:30
const workQueueRetryLimit = 3
2020-10-22 19:57:28 -07:00
// Generator creates report request
2020-08-26 18:50:38 +05:30
type Generator struct {
dclient *dclient.Client
reportChangeRequestInterface reportchangerequest.PolicyV1alpha1Interface
2020-08-26 18:50:38 +05:30
reportChangeRequestLister policyreport.ReportChangeRequestLister
clusterReportChangeRequestLister policyreport.ClusterReportChangeRequestLister
2020-10-22 19:57:28 -07:00
// returns true if the cluster report request store has been synced at least once
reportReqSynced cache.InformerSynced
// returns true if the namespaced report request store has been synced at at least once
clusterReportReqSynced cache.InformerSynced
queue workqueue.RateLimitingInterface
dataStore *dataStore
2020-10-23 18:27:55 -07:00
// update policy status with violationCount
policyStatusListener policystatus.Listener
2020-10-22 19:57:28 -07:00
log logr.Logger
}
// NewReportChangeRequestGenerator returns a new instance of report request generator
func NewReportChangeRequestGenerator(client *policyreportclient.Clientset,
2020-10-22 19:57:28 -07:00
dclient *dclient.Client,
reportReqInformer policyreportinformer.ReportChangeRequestInformer,
clusterReportReqInformer policyreportinformer.ClusterReportChangeRequestInformer,
2020-10-22 19:57:28 -07:00
policyStatus policystatus.Listener,
log logr.Logger) *Generator {
gen := Generator{
reportChangeRequestInterface: client.PolicyV1alpha1(),
dclient: dclient,
clusterReportChangeRequestLister: clusterReportReqInformer.Lister(),
clusterReportReqSynced: clusterReportReqInformer.Informer().HasSynced,
reportChangeRequestLister: reportReqInformer.Lister(),
reportReqSynced: reportReqInformer.Informer().HasSynced,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
policyStatusListener: policyStatus,
log: log,
2020-10-22 19:57:28 -07:00
}
return &gen
2020-08-26 18:50:38 +05:30
}
// NewDataStore returns an instance of data store
2020-08-26 18:50:38 +05:30
func newDataStore() *dataStore {
ds := dataStore{
data: make(map[string]Info),
}
return &ds
}
type dataStore struct {
data map[string]Info
mu sync.RWMutex
}
func (ds *dataStore) add(keyHash string, info Info) {
ds.mu.Lock()
defer ds.mu.Unlock()
// queue the key hash
ds.data[keyHash] = info
}
func (ds *dataStore) lookup(keyHash string) Info {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.data[keyHash]
}
func (ds *dataStore) delete(keyHash string) {
ds.mu.Lock()
defer ds.mu.Unlock()
delete(ds.data, keyHash)
}
//Info is a request to create PV
type Info struct {
PolicyName string
Resource unstructured.Unstructured
Rules []kyverno.ViolatedRule
FromSync bool
}
func (i Info) toKey() string {
keys := []string{
i.PolicyName,
i.Resource.GetKind(),
i.Resource.GetNamespace(),
i.Resource.GetName(),
strconv.Itoa(len(i.Rules)),
}
return strings.Join(keys, "/")
}
// GeneratorInterface provides API to create PVs
2020-09-15 06:59:05 -07:00
type GeneratorInterface interface {
Add(infos ...Info)
}
2020-08-26 18:50:38 +05:30
func (gen *Generator) enqueue(info Info) {
keyHash := info.toKey()
gen.dataStore.add(keyHash, info)
gen.queue.Add(keyHash)
}
// Add queues a policy violation create request
2020-08-26 18:50:38 +05:30
func (gen *Generator) Add(infos ...Info) {
for _, info := range infos {
gen.enqueue(info)
}
}
// Run starts the workers
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
logger := gen.log
defer utilruntime.HandleCrash()
logger.Info("start")
defer logger.Info("shutting down")
2020-10-22 19:57:28 -07:00
if !cache.WaitForCacheSync(stopCh, gen.reportReqSynced, gen.clusterReportReqSynced) {
2020-08-26 18:50:38 +05:30
logger.Info("failed to sync informer cache")
}
2020-09-15 06:59:05 -07:00
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
2020-09-10 05:10:29 -07:00
<-stopCh
2020-08-26 18:50:38 +05:30
}
func (gen *Generator) runWorker() {
for gen.processNextWorkItem() {
}
}
func (gen *Generator) handleErr(err error, key interface{}) {
logger := gen.log
if err == nil {
gen.queue.Forget(key)
return
}
// retires requests if there is error
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
2020-10-22 19:57:28 -07:00
logger.Error(err, "failed to sync report request", "key", key)
2020-08-26 18:50:38 +05:30
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
gen.queue.AddRateLimited(key)
return
}
gen.queue.Forget(key)
// remove from data store
if keyHash, ok := key.(string); ok {
gen.dataStore.delete(keyHash)
}
logger.Error(err, "dropping key out of the queue", "key", key)
}
func (gen *Generator) processNextWorkItem() bool {
logger := gen.log
obj, shutdown := gen.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer gen.queue.Done(obj)
var keyHash string
var ok bool
if keyHash, ok = obj.(string); !ok {
gen.queue.Forget(obj)
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
return nil
}
// lookup data store
info := gen.dataStore.lookup(keyHash)
if reflect.DeepEqual(info, Info{}) {
gen.queue.Forget(obj)
2020-10-23 18:27:55 -07:00
logger.V(3).Info("empty key")
2020-08-26 18:50:38 +05:30
return nil
}
err := gen.syncHandler(info)
gen.handleErr(err, obj)
return nil
}(obj)
if err != nil {
logger.Error(err, "failed to process item")
}
return true
}
func (gen *Generator) syncHandler(info Info) error {
gen.log.V(3).Info("generating report change request")
reportChangeRequestUnstructured, err := NewBuilder().build(info)
2020-10-23 18:27:55 -07:00
if err != nil {
return fmt.Errorf("unable to build reportChangeRequest: %v", err)
2020-10-23 18:27:55 -07:00
}
return gen.sync(reportChangeRequestUnstructured, info)
2020-10-23 18:27:55 -07:00
}
func (gen *Generator) sync(reportReq *unstructured.Unstructured, info Info) error {
defer func() {
if val := reportReq.GetAnnotations()["fromSync"]; val == "true" {
gen.policyStatusListener.Send(violationCount{
policyName: info.PolicyName,
violatedRules: info.Rules,
})
}
}()
logger := gen.log.WithName("sync")
reportReq.SetCreationTimestamp(v1.Now())
2020-10-23 18:27:55 -07:00
if reportReq.GetNamespace() == "" {
old, err := gen.clusterReportChangeRequestLister.Get(reportReq.GetName())
2020-10-23 18:27:55 -07:00
if err != nil {
if apierrors.IsNotFound(err) {
if _, err = gen.dclient.CreateResource(reportReq.GetAPIVersion(), reportReq.GetKind(), "", reportReq, false); err != nil {
return fmt.Errorf("failed to create clusterReportChangeRequest: %v", err)
}
logger.V(3).Info("successfully created clusterReportChangeRequest", "name", reportReq.GetName())
return nil
2020-10-23 18:27:55 -07:00
}
return fmt.Errorf("unable to get %s: %v", reportReq.GetKind(), err)
2020-10-23 18:27:55 -07:00
}
return updateReportChangeRequest(gen.dclient, old, reportReq, logger)
2020-10-23 18:27:55 -07:00
}
old, err := gen.reportChangeRequestLister.ReportChangeRequests(config.KubePolicyNamespace).Get(reportReq.GetName())
2020-10-23 18:27:55 -07:00
if err != nil {
if apierrors.IsNotFound(err) {
if _, err = gen.dclient.CreateResource(reportReq.GetAPIVersion(), reportReq.GetKind(), config.KubePolicyNamespace, reportReq, false); err != nil {
return fmt.Errorf("failed to create %s: %v", reportReq.GetKind(), err)
}
logger.V(3).Info("successfully created reportChangeRequest", "name", reportReq.GetName())
return nil
2020-10-23 18:27:55 -07:00
}
return fmt.Errorf("unable to get existing reportChangeRequest %v", err)
2020-10-23 18:27:55 -07:00
}
return updateReportChangeRequest(gen.dclient, old, reportReq, logger)
2020-10-23 18:27:55 -07:00
}
func updateReportChangeRequest(dClient *client.Client, old interface{}, new *unstructured.Unstructured, log logr.Logger) (err error) {
2020-10-23 18:27:55 -07:00
oldUnstructed := make(map[string]interface{})
if oldTyped, ok := old.(*report.ReportChangeRequest); ok {
2020-10-23 18:27:55 -07:00
if oldUnstructed, err = runtime.DefaultUnstructuredConverter.ToUnstructured(oldTyped); err != nil {
return fmt.Errorf("unable to convert reportChangeRequest: %v", err)
2020-10-23 18:27:55 -07:00
}
new.SetResourceVersion(oldTyped.GetResourceVersion())
new.SetUID(oldTyped.GetUID())
2020-10-23 18:27:55 -07:00
} else {
oldTyped := old.(*report.ClusterReportChangeRequest)
2020-10-23 18:27:55 -07:00
if oldUnstructed, err = runtime.DefaultUnstructuredConverter.ToUnstructured(oldTyped); err != nil {
return fmt.Errorf("unable to convert clusterReportChangeRequest: %v", err)
2020-10-23 18:27:55 -07:00
}
new.SetUID(oldTyped.GetUID())
2020-10-23 18:27:55 -07:00
new.SetResourceVersion(oldTyped.GetResourceVersion())
}
if !hasResultsChanged(oldUnstructed, new.UnstructuredContent()) {
log.V(4).Info("unchanged report request", "name", new.GetName())
2020-10-23 18:27:55 -07:00
return nil
}
// TODO(shuting): set annotation / label
if _, err = dClient.UpdateResource(new.GetAPIVersion(), new.GetKind(), config.KubePolicyNamespace, new, false); err != nil {
return fmt.Errorf("failed to update report request: %v", err)
}
log.V(4).Info("successfully updated report request", "kind", new.GetKind(), "name", new.GetName())
return
2020-10-23 18:27:55 -07:00
}
func hasResultsChanged(old, new map[string]interface{}) bool {
2020-10-27 20:46:41 -07:00
oldRes := old["results"]
newRes := new["results"]
2020-10-23 18:27:55 -07:00
return !reflect.DeepEqual(oldRes, newRes)
2020-08-26 18:50:38 +05:30
}