mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-06 16:06:56 +00:00
- fix split policyreport name with background scan - fix the label selector initialising - refactor the generatePolicyName func Signed-off-by: prateekpandey14 <prateek.pandey@nirmata.com>
344 lines
9.1 KiB
Go
344 lines
9.1 KiB
Go
package policyreport
|
|
|
|
import (
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
kyvernov1 "github.com/kyverno/kyverno/api/kyverno/v1"
|
|
kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
|
|
kyvernov1informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1"
|
|
kyvernov1alpha2informers "github.com/kyverno/kyverno/pkg/client/informers/externalversions/kyverno/v1alpha2"
|
|
kyvernov1listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1"
|
|
kyvernov1alpha2listers "github.com/kyverno/kyverno/pkg/client/listers/kyverno/v1alpha2"
|
|
"github.com/kyverno/kyverno/pkg/dclient"
|
|
"github.com/kyverno/kyverno/pkg/engine/response"
|
|
cmap "github.com/orcaman/concurrent-map"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/util/workqueue"
|
|
)
|
|
|
|
const (
|
|
workQueueName = "report-request-controller"
|
|
workQueueRetryLimit = 10
|
|
)
|
|
|
|
// Generator creates report request
|
|
type Generator struct {
|
|
dclient dclient.Interface
|
|
|
|
reportChangeRequestLister kyvernov1alpha2listers.ReportChangeRequestLister
|
|
|
|
clusterReportChangeRequestLister kyvernov1alpha2listers.ClusterReportChangeRequestLister
|
|
|
|
// changeRequestMapper stores the change requests' count per namespace
|
|
changeRequestMapper concurrentMap
|
|
|
|
// cpolLister can list/get policy from the shared informer's store
|
|
cpolLister kyvernov1listers.ClusterPolicyLister
|
|
|
|
// polLister can list/get namespace policy from the shared informer's store
|
|
polLister kyvernov1listers.PolicyLister
|
|
|
|
informersSynced []cache.InformerSynced
|
|
|
|
queue workqueue.RateLimitingInterface
|
|
dataStore *dataStore
|
|
|
|
requestCreator creator
|
|
|
|
// changeRequestLimit defines the max count for change requests (per namespace for RCR / cluster-wide for CRCR)
|
|
changeRequestLimit int
|
|
|
|
// CleanupChangeRequest signals the policy report controller to cleanup change requests
|
|
CleanupChangeRequest chan ReconcileInfo
|
|
|
|
log logr.Logger
|
|
}
|
|
|
|
// NewReportChangeRequestGenerator returns a new instance of report request generator
|
|
func NewReportChangeRequestGenerator(client kyvernoclient.Interface,
|
|
dclient dclient.Interface,
|
|
reportReqInformer kyvernov1alpha2informers.ReportChangeRequestInformer,
|
|
clusterReportReqInformer kyvernov1alpha2informers.ClusterReportChangeRequestInformer,
|
|
cpolInformer kyvernov1informers.ClusterPolicyInformer,
|
|
polInformer kyvernov1informers.PolicyInformer,
|
|
changeRequestLimit int,
|
|
log logr.Logger,
|
|
) *Generator {
|
|
gen := Generator{
|
|
dclient: dclient,
|
|
clusterReportChangeRequestLister: clusterReportReqInformer.Lister(),
|
|
reportChangeRequestLister: reportReqInformer.Lister(),
|
|
changeRequestMapper: newChangeRequestMapper(),
|
|
cpolLister: cpolInformer.Lister(),
|
|
polLister: polInformer.Lister(),
|
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
|
|
dataStore: newDataStore(),
|
|
changeRequestLimit: changeRequestLimit,
|
|
CleanupChangeRequest: make(chan ReconcileInfo, 10),
|
|
requestCreator: newChangeRequestCreator(client, 3*time.Second, log.WithName("requestCreator")),
|
|
log: log,
|
|
}
|
|
|
|
gen.informersSynced = []cache.InformerSynced{clusterReportReqInformer.Informer().HasSynced, reportReqInformer.Informer().HasSynced, cpolInformer.Informer().HasSynced, polInformer.Informer().HasSynced}
|
|
return &gen
|
|
}
|
|
|
|
type ReconcileInfo struct {
|
|
Namespace *string
|
|
MapperInactive bool
|
|
}
|
|
|
|
// NewDataStore returns an instance of data store
|
|
func newDataStore() *dataStore {
|
|
ds := dataStore{
|
|
data: make(map[string]Info),
|
|
}
|
|
return &ds
|
|
}
|
|
|
|
type dataStore struct {
|
|
data map[string]Info
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (ds *dataStore) add(keyHash string, info Info) {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
// queue the key hash
|
|
ds.data[keyHash] = info
|
|
}
|
|
|
|
func (ds *dataStore) lookup(keyHash string) Info {
|
|
ds.mu.RLock()
|
|
defer ds.mu.RUnlock()
|
|
return ds.data[keyHash]
|
|
}
|
|
|
|
func (ds *dataStore) delete(keyHash string) {
|
|
ds.mu.Lock()
|
|
defer ds.mu.Unlock()
|
|
delete(ds.data, keyHash)
|
|
}
|
|
|
|
// Info stores the policy application results for all matched resources
|
|
// Namespace is set to empty "" if resource is cluster wide resource
|
|
type Info struct {
|
|
PolicyName string
|
|
Namespace string
|
|
Results []EngineResponseResult
|
|
}
|
|
|
|
type EngineResponseResult struct {
|
|
Resource response.ResourceSpec
|
|
Rules []kyvernov1.ViolatedRule
|
|
}
|
|
|
|
func (i Info) ToKey() string {
|
|
keys := []string{
|
|
i.PolicyName,
|
|
i.Namespace,
|
|
strconv.Itoa(len(i.Results)),
|
|
}
|
|
|
|
for _, result := range i.Results {
|
|
keys = append(keys, result.Resource.GetKey())
|
|
}
|
|
return strings.Join(keys, "/")
|
|
}
|
|
|
|
func (i Info) GetRuleLength() int {
|
|
l := 0
|
|
for _, res := range i.Results {
|
|
l += len(res.Rules)
|
|
}
|
|
return l
|
|
}
|
|
|
|
func parseKeyHash(keyHash string) (policyName, ns string) {
|
|
keys := strings.Split(keyHash, "/")
|
|
return keys[0], keys[1]
|
|
}
|
|
|
|
// GeneratorInterface provides API to create PVs
|
|
type GeneratorInterface interface {
|
|
Add(infos ...Info)
|
|
MapperReset(string)
|
|
MapperInactive(string)
|
|
MapperInvalidate()
|
|
}
|
|
|
|
func (gen *Generator) enqueue(info Info) {
|
|
keyHash := info.ToKey()
|
|
gen.dataStore.add(keyHash, info)
|
|
gen.queue.Add(keyHash)
|
|
}
|
|
|
|
// Add queues a policy violation create request
|
|
func (gen *Generator) Add(infos ...Info) {
|
|
for _, info := range infos {
|
|
count, ok := gen.changeRequestMapper.ConcurrentMap.Get(info.Namespace)
|
|
if ok && count == -1 {
|
|
gen.log.V(6).Info("inactive policy report, skip creating report change request", "namespace", info.Namespace)
|
|
continue
|
|
}
|
|
|
|
gen.changeRequestMapper.increase(info.Namespace)
|
|
gen.enqueue(info)
|
|
}
|
|
}
|
|
|
|
// MapperReset resets the change request mapper for the given namespace
|
|
func (gen Generator) MapperReset(ns string) {
|
|
gen.changeRequestMapper.ConcurrentMap.Set(ns, 0)
|
|
}
|
|
|
|
// MapperInactive sets the change request mapper for the given namespace to -1
|
|
// which indicates the report is inactive
|
|
func (gen Generator) MapperInactive(ns string) {
|
|
gen.changeRequestMapper.ConcurrentMap.Set(ns, -1)
|
|
}
|
|
|
|
// MapperInvalidate reset map entries
|
|
func (gen Generator) MapperInvalidate() {
|
|
for ns := range gen.changeRequestMapper.ConcurrentMap.Items() {
|
|
gen.changeRequestMapper.ConcurrentMap.Remove(ns)
|
|
}
|
|
}
|
|
|
|
// Run starts the workers
|
|
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
|
|
logger := gen.log
|
|
defer utilruntime.HandleCrash()
|
|
logger.Info("start")
|
|
defer logger.Info("shutting down")
|
|
|
|
if !cache.WaitForNamedCacheSync("requestCreator", stopCh, gen.informersSynced...) {
|
|
return
|
|
}
|
|
|
|
for i := 0; i < workers; i++ {
|
|
go wait.Until(gen.runWorker, time.Second, stopCh)
|
|
}
|
|
|
|
go gen.requestCreator.run(stopCh)
|
|
|
|
<-stopCh
|
|
}
|
|
|
|
func (gen *Generator) runWorker() {
|
|
for gen.processNextWorkItem() {
|
|
}
|
|
}
|
|
|
|
func (gen *Generator) handleErr(err error, key interface{}) {
|
|
logger := gen.log
|
|
keyHash, ok := key.(string)
|
|
if !ok {
|
|
keyHash = ""
|
|
}
|
|
|
|
if err == nil {
|
|
gen.queue.Forget(key)
|
|
gen.dataStore.delete(keyHash)
|
|
gen.changeRequestMapper.decrease(keyHash)
|
|
return
|
|
}
|
|
|
|
// retires requests if there is error
|
|
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
|
|
logger.V(3).Info("retrying report request", "key", key, "error", err)
|
|
gen.queue.AddRateLimited(key)
|
|
return
|
|
}
|
|
|
|
logger.Error(err, "failed to process report request", "key", key)
|
|
gen.queue.Forget(key)
|
|
gen.dataStore.delete(keyHash)
|
|
gen.changeRequestMapper.decrease(keyHash)
|
|
}
|
|
|
|
func (gen *Generator) processNextWorkItem() bool {
|
|
logger := gen.log
|
|
obj, shutdown := gen.queue.Get()
|
|
if shutdown {
|
|
return false
|
|
}
|
|
|
|
defer gen.queue.Done(obj)
|
|
var keyHash string
|
|
var ok bool
|
|
|
|
if keyHash, ok = obj.(string); !ok {
|
|
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
|
|
gen.handleErr(nil, obj)
|
|
return true
|
|
}
|
|
|
|
// lookup data store
|
|
info := gen.dataStore.lookup(keyHash)
|
|
if reflect.DeepEqual(info, Info{}) {
|
|
logger.V(4).Info("empty key")
|
|
gen.handleErr(nil, obj)
|
|
return true
|
|
}
|
|
|
|
count, ok := gen.changeRequestMapper.Get(info.Namespace)
|
|
if ok {
|
|
if count.(int) > gen.changeRequestLimit {
|
|
logger.Info("throttling report change requests", "namespace", info.Namespace, "threshold", gen.changeRequestLimit, "count", count.(int))
|
|
gen.CleanupChangeRequest <- ReconcileInfo{Namespace: &(info.Namespace), MapperInactive: false}
|
|
gen.queue.Forget(obj)
|
|
gen.dataStore.delete(keyHash)
|
|
return true
|
|
}
|
|
}
|
|
|
|
err := gen.syncHandler(info)
|
|
gen.handleErr(err, obj)
|
|
|
|
return true
|
|
}
|
|
|
|
func (gen *Generator) syncHandler(info Info) error {
|
|
builder := NewBuilder(gen.cpolLister, gen.polLister)
|
|
reportReq, err := builder.build(info)
|
|
if err != nil {
|
|
return fmt.Errorf("unable to build reportChangeRequest: %v", err)
|
|
}
|
|
|
|
if reportReq == nil {
|
|
return nil
|
|
}
|
|
|
|
gen.requestCreator.add(reportReq)
|
|
return nil
|
|
}
|
|
|
|
func hasResultsChanged(old, new map[string]interface{}) bool {
|
|
var oldRes, newRes []interface{}
|
|
if val, ok := old["results"]; ok {
|
|
oldRes = val.([]interface{})
|
|
}
|
|
|
|
if val, ok := new["results"]; ok {
|
|
newRes = val.([]interface{})
|
|
}
|
|
|
|
if len(oldRes) != len(newRes) {
|
|
return true
|
|
}
|
|
|
|
return !reflect.DeepEqual(oldRes, newRes)
|
|
}
|
|
|
|
func newChangeRequestMapper() concurrentMap {
|
|
return concurrentMap{cmap.New()}
|
|
}
|