mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-06 16:06:56 +00:00
* skip sending API request for filtered resource * fix PR comment Signed-off-by: Shuting Zhao <shutting06@gmail.com> * fixes https://github.com/kyverno/kyverno/issues/1490 Signed-off-by: Shuting Zhao <shutting06@gmail.com> * fix bug - namespace is not returned properly Signed-off-by: Shuting Zhao <shutting06@gmail.com> * reduce throttling - list resource using lister * refactor resource cache * fix test Signed-off-by: Shuting Zhao <shutting06@gmail.com> * fix label selector Signed-off-by: Shuting Zhao <shutting06@gmail.com> * fix build failure Signed-off-by: Shuting Zhao <shutting06@gmail.com> * fixes #1480 * store resource name and kind in (c)rcr's annotation
249 lines
6.5 KiB
Go
249 lines
6.5 KiB
Go
package policyreport
|
|
|
|
import (
|
|
"crypto/rand"
|
|
"math/big"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/kyverno/kyverno/pkg/config"
|
|
dclient "github.com/kyverno/kyverno/pkg/dclient"
|
|
cache "github.com/patrickmn/go-cache"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
)
|
|
|
|
// creator is an interface that buffers report change requests
|
|
// merges and creates requests every tickerInterval
|
|
type creator interface {
|
|
add(request *unstructured.Unstructured)
|
|
create(request *unstructured.Unstructured) error
|
|
run(stopChan <-chan struct{})
|
|
}
|
|
|
|
type changeRequestCreator struct {
|
|
dclient *dclient.Client
|
|
|
|
// addCache preserves requests that are to be added to report
|
|
RCRCache *cache.Cache
|
|
|
|
CRCRCache *cache.Cache
|
|
// removeCache preserves requests that are to be removed from report
|
|
// removeCache *cache.Cache
|
|
mutex sync.RWMutex
|
|
queue []string
|
|
|
|
tickerInterval time.Duration
|
|
|
|
log logr.Logger
|
|
}
|
|
|
|
func newChangeRequestCreator(client *dclient.Client, tickerInterval time.Duration, log logr.Logger) creator {
|
|
return &changeRequestCreator{
|
|
dclient: client,
|
|
RCRCache: cache.New(0, 24*time.Hour),
|
|
CRCRCache: cache.New(0, 24*time.Hour),
|
|
queue: []string{},
|
|
tickerInterval: tickerInterval,
|
|
log: log,
|
|
}
|
|
}
|
|
|
|
func (c *changeRequestCreator) add(request *unstructured.Unstructured) {
|
|
uid, _ := rand.Int(rand.Reader, big.NewInt(100000))
|
|
|
|
switch request.GetKind() {
|
|
case "ClusterReportChangeRequest":
|
|
c.CRCRCache.Add(uid.String(), request, cache.NoExpiration)
|
|
case "ReportChangeRequest":
|
|
c.RCRCache.Add(uid.String(), request, cache.NoExpiration)
|
|
default:
|
|
return
|
|
}
|
|
|
|
c.mutex.Lock()
|
|
c.queue = append(c.queue, uid.String())
|
|
c.mutex.Unlock()
|
|
}
|
|
|
|
func (c *changeRequestCreator) create(request *unstructured.Unstructured) error {
|
|
ns := ""
|
|
if request.GetKind() == "ReportChangeRequest" {
|
|
ns = config.KyvernoNamespace
|
|
}
|
|
_, err := c.dclient.CreateResource(request.GetAPIVersion(), request.GetKind(), ns, request, false)
|
|
return err
|
|
}
|
|
|
|
func (c *changeRequestCreator) run(stopChan <-chan struct{}) {
|
|
ticker := time.NewTicker(c.tickerInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
requests, size := c.mergeRequests()
|
|
for _, request := range requests {
|
|
if err := c.create(request); err != nil {
|
|
c.log.Error(err, "failed to create report change request", "req", request.Object)
|
|
}
|
|
}
|
|
|
|
c.cleanupQueue(size)
|
|
case <-stopChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *changeRequestCreator) cleanupQueue(size int) {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
for i := 0; i < size; i++ {
|
|
uid := c.queue[i]
|
|
c.CRCRCache.Delete(uid)
|
|
c.RCRCache.Delete(uid)
|
|
}
|
|
|
|
c.queue = c.queue[size:]
|
|
}
|
|
|
|
// mergeRequests merges all current cached requests
|
|
// it blocks writing to the cache
|
|
func (c *changeRequestCreator) mergeRequests() (results []*unstructured.Unstructured, size int) {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
mergedCRCR := &unstructured.Unstructured{}
|
|
mergedRCR := make(map[string]*unstructured.Unstructured, 0)
|
|
size = len(c.queue)
|
|
|
|
for _, uid := range c.queue {
|
|
if unstr, ok := c.CRCRCache.Get(uid); ok {
|
|
if crcr, ok := unstr.(*unstructured.Unstructured); ok {
|
|
if isDeleteRequest(crcr) {
|
|
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
|
results = append(results, mergedCRCR)
|
|
mergedCRCR = &unstructured.Unstructured{}
|
|
}
|
|
|
|
results = append(results, crcr)
|
|
} else {
|
|
if reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
|
mergedCRCR = crcr
|
|
continue
|
|
}
|
|
|
|
if ok := merge(mergedCRCR, crcr); !ok {
|
|
results = append(results, mergedCRCR)
|
|
mergedCRCR = crcr
|
|
}
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
if unstr, ok := c.RCRCache.Get(uid); ok {
|
|
if rcr, ok := unstr.(*unstructured.Unstructured); ok {
|
|
resourceNS := rcr.GetLabels()[resourceLabelNamespace]
|
|
mergedNamespacedRCR, ok := mergedRCR[resourceNS]
|
|
if !ok {
|
|
mergedNamespacedRCR = &unstructured.Unstructured{}
|
|
}
|
|
|
|
if isDeleteRequest(rcr) {
|
|
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
|
results = append(results, mergedNamespacedRCR)
|
|
mergedRCR[resourceNS] = &unstructured.Unstructured{}
|
|
}
|
|
|
|
results = append(results, rcr)
|
|
} else {
|
|
if reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
|
mergedRCR[resourceNS] = rcr
|
|
continue
|
|
}
|
|
|
|
if ok := merge(mergedNamespacedRCR, rcr); !ok {
|
|
results = append(results, mergedNamespacedRCR)
|
|
mergedRCR[resourceNS] = rcr
|
|
} else {
|
|
mergedRCR[resourceNS] = mergedNamespacedRCR
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
|
results = append(results, mergedCRCR)
|
|
}
|
|
|
|
for _, mergedNamespacedRCR := range mergedRCR {
|
|
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
|
results = append(results, mergedNamespacedRCR)
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// merge merges elements from a source object into a
|
|
// destination object if they share the same namespace label
|
|
func merge(dst, src *unstructured.Unstructured) bool {
|
|
dstNS := dst.GetLabels()[resourceLabelNamespace]
|
|
srcNS := src.GetLabels()[resourceLabelNamespace]
|
|
if dstNS != srcNS {
|
|
return false
|
|
}
|
|
|
|
if dstResults, ok, _ := unstructured.NestedSlice(dst.UnstructuredContent(), "results"); ok {
|
|
if srcResults, ok, _ := unstructured.NestedSlice(src.UnstructuredContent(), "results"); ok {
|
|
dstResults = append(dstResults, srcResults...)
|
|
|
|
if err := unstructured.SetNestedSlice(dst.UnstructuredContent(), dstResults, "results"); err == nil {
|
|
addSummary(dst, src)
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func addSummary(dst, src *unstructured.Unstructured) {
|
|
if dstSum, ok, _ := unstructured.NestedMap(dst.UnstructuredContent(), "summary"); ok {
|
|
if srcSum, ok, _ := unstructured.NestedMap(src.UnstructuredContent(), "summary"); ok {
|
|
for key, dstVal := range dstSum {
|
|
if dstValInt, ok := dstVal.(int64); ok {
|
|
if srcVal, ok := srcSum[key].(int64); ok {
|
|
dstSum[key] = dstValInt + srcVal
|
|
}
|
|
}
|
|
}
|
|
}
|
|
unstructured.SetNestedMap(dst.UnstructuredContent(), dstSum, "summary")
|
|
}
|
|
}
|
|
|
|
func isDeleteRequest(request *unstructured.Unstructured) bool {
|
|
deleteLabels := []string{deletedLabelPolicy, deletedLabelRule}
|
|
labels := request.GetLabels()
|
|
|
|
for _, l := range deleteLabels {
|
|
if _, ok := labels[l]; ok {
|
|
return true
|
|
}
|
|
}
|
|
|
|
deleteAnnotations := []string{deletedAnnotationResourceName, deletedAnnotationResourceKind}
|
|
annotations := request.GetAnnotations()
|
|
for _, ann := range deleteAnnotations {
|
|
if _, ok := annotations[ann]; ok {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|