1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 16:06:56 +00:00
kyverno/pkg/policyreport/changerequestcreator.go
shuting bd44dbff41
Reduce RCR Throttling (#1545)
* buffer report change requests

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* fix clusterReportChangeRequest

Signed-off-by: Shuting Zhao <shutting06@gmail.com>

* further reduce RCRs in background scan

Signed-off-by: Shuting Zhao <shutting06@gmail.com>
2021-02-07 19:46:50 -08:00

240 lines
6.3 KiB
Go

package policyreport
import (
"crypto/rand"
"math/big"
"reflect"
"sync"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/config"
dclient "github.com/kyverno/kyverno/pkg/dclient"
cache "github.com/patrickmn/go-cache"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// creator is an interface that buffers report change requests
// merges and creates requests every tickerInterval
type creator interface {
add(request *unstructured.Unstructured)
create(request *unstructured.Unstructured) error
run(stopChan <-chan struct{})
}
type changeRequestCreator struct {
dclient *dclient.Client
// addCache preserves requests that are to be added to report
RCRCache *cache.Cache
CRCRCache *cache.Cache
// removeCache preserves requests that are to be removed from report
// removeCache *cache.Cache
mutex sync.RWMutex
queue []string
tickerInterval time.Duration
log logr.Logger
}
func newChangeRequestCreator(client *dclient.Client, tickerInterval time.Duration, log logr.Logger) creator {
return &changeRequestCreator{
dclient: client,
RCRCache: cache.New(0, 24*time.Hour),
CRCRCache: cache.New(0, 24*time.Hour),
queue: []string{},
tickerInterval: tickerInterval,
log: log,
}
}
func (c *changeRequestCreator) add(request *unstructured.Unstructured) {
uid, _ := rand.Int(rand.Reader, big.NewInt(100000))
switch request.GetKind() {
case "ClusterReportChangeRequest":
c.CRCRCache.Add(uid.String(), request, cache.NoExpiration)
case "ReportChangeRequest":
c.RCRCache.Add(uid.String(), request, cache.NoExpiration)
default:
return
}
c.mutex.Lock()
c.queue = append(c.queue, uid.String())
c.mutex.Unlock()
}
func (c *changeRequestCreator) create(request *unstructured.Unstructured) error {
ns := ""
if request.GetKind() == "ReportChangeRequest" {
ns = config.KyvernoNamespace
}
_, err := c.dclient.CreateResource(request.GetAPIVersion(), request.GetKind(), ns, request, false)
return err
}
func (c *changeRequestCreator) run(stopChan <-chan struct{}) {
ticker := time.NewTicker(c.tickerInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
requests, size := c.mergeRequests()
for _, request := range requests {
if err := c.create(request); err != nil {
c.log.Error(err, "failed to create report change request", "req", request.Object)
}
}
c.cleanupQueue(size)
case <-stopChan:
return
}
}
}
func (c *changeRequestCreator) cleanupQueue(size int) {
c.mutex.Lock()
defer c.mutex.Unlock()
for i := 0; i < size; i++ {
uid := c.queue[i]
c.CRCRCache.Delete(uid)
c.RCRCache.Delete(uid)
}
c.queue = c.queue[size:]
}
// mergeRequests merges all current cached requests
// it blocks writing to the cache
func (c *changeRequestCreator) mergeRequests() (results []*unstructured.Unstructured, size int) {
c.mutex.Lock()
defer c.mutex.Unlock()
mergedCRCR := &unstructured.Unstructured{}
mergedRCR := make(map[string]*unstructured.Unstructured, 0)
size = len(c.queue)
for _, uid := range c.queue {
if unstr, ok := c.CRCRCache.Get(uid); ok {
if crcr, ok := unstr.(*unstructured.Unstructured); ok {
if isDeleteRequest(crcr) {
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
results = append(results, mergedCRCR)
mergedCRCR = &unstructured.Unstructured{}
}
results = append(results, crcr)
} else {
if reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
mergedCRCR = crcr
continue
}
if ok := merge(mergedCRCR, crcr); !ok {
results = append(results, mergedCRCR)
mergedCRCR = crcr
}
}
}
continue
}
if unstr, ok := c.RCRCache.Get(uid); ok {
if rcr, ok := unstr.(*unstructured.Unstructured); ok {
resourceNS := rcr.GetLabels()[resourceLabelNamespace]
mergedNamespacedRCR, ok := mergedRCR[resourceNS]
if !ok {
mergedNamespacedRCR = &unstructured.Unstructured{}
}
if isDeleteRequest(rcr) {
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
results = append(results, mergedNamespacedRCR)
mergedRCR[resourceNS] = &unstructured.Unstructured{}
}
results = append(results, rcr)
} else {
if reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
mergedRCR[resourceNS] = rcr
continue
}
if ok := merge(mergedNamespacedRCR, rcr); !ok {
results = append(results, mergedNamespacedRCR)
mergedRCR[resourceNS] = rcr
} else {
mergedRCR[resourceNS] = mergedNamespacedRCR
}
}
}
}
}
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
results = append(results, mergedCRCR)
}
for _, mergedNamespacedRCR := range mergedRCR {
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
results = append(results, mergedNamespacedRCR)
}
}
return
}
// merge merges elements from a source object into a
// destination object if they share the same namespace label
func merge(dst, src *unstructured.Unstructured) bool {
dstNS := dst.GetLabels()[resourceLabelNamespace]
srcNS := src.GetLabels()[resourceLabelNamespace]
if dstNS != srcNS {
return false
}
if dstResults, ok, _ := unstructured.NestedSlice(dst.UnstructuredContent(), "results"); ok {
if srcResults, ok, _ := unstructured.NestedSlice(src.UnstructuredContent(), "results"); ok {
dstResults = append(dstResults, srcResults...)
if err := unstructured.SetNestedSlice(dst.UnstructuredContent(), dstResults, "results"); err == nil {
addSummary(dst, src)
return true
}
}
}
return false
}
func addSummary(dst, src *unstructured.Unstructured) {
if dstSum, ok, _ := unstructured.NestedMap(dst.UnstructuredContent(), "summary"); ok {
if srcSum, ok, _ := unstructured.NestedMap(src.UnstructuredContent(), "summary"); ok {
for key, dstVal := range dstSum {
if dstValInt, ok := dstVal.(int64); ok {
if srcVal, ok := srcSum[key].(int64); ok {
dstSum[key] = dstValInt + srcVal
}
}
}
}
unstructured.SetNestedMap(dst.UnstructuredContent(), dstSum, "summary")
}
}
func isDeleteRequest(request *unstructured.Unstructured) bool {
deleteLabels := []string{deletedLabelPolicy, deletedLabelRule, deletedLabelResource, deletedLabelResourceKind}
labels := request.GetLabels()
for _, l := range deleteLabels {
if _, ok := labels[l]; ok {
return true
}
}
return false
}