mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-31 03:45:17 +00:00
refactor: simplify RCR creator queue (#4578)
Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com> Signed-off-by: Charles-Edouard Brétéché <charled.breteche@gmail.com>
This commit is contained in:
parent
0048c06c9a
commit
da5312c177
1 changed files with 89 additions and 174 deletions
|
@ -2,8 +2,6 @@ package policyreport
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"math/big"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
@ -12,7 +10,6 @@ import (
|
|||
"github.com/kyverno/kyverno/pkg/client/clientset/versioned"
|
||||
"github.com/kyverno/kyverno/pkg/config"
|
||||
"github.com/kyverno/kyverno/pkg/toggle"
|
||||
"github.com/patrickmn/go-cache"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
)
|
||||
|
@ -21,88 +18,47 @@ import (
|
|||
// merges and creates requests every tickerInterval
|
||||
type creator interface {
|
||||
add(request *unstructured.Unstructured)
|
||||
create(request *unstructured.Unstructured) error
|
||||
run(stopChan <-chan struct{})
|
||||
}
|
||||
|
||||
type changeRequestCreator struct {
|
||||
client versioned.Interface
|
||||
|
||||
// addCache preserves requests that are to be added to report
|
||||
rcr_cache *cache.Cache
|
||||
|
||||
crcr_cache *cache.Cache
|
||||
// removeCache preserves requests that are to be removed from report
|
||||
// removeCache *cache.Cache
|
||||
mutex sync.RWMutex
|
||||
queue []string
|
||||
|
||||
client versioned.Interface
|
||||
mutex *sync.RWMutex
|
||||
queue []*unstructured.Unstructured
|
||||
tickerInterval time.Duration
|
||||
|
||||
log logr.Logger
|
||||
log logr.Logger
|
||||
}
|
||||
|
||||
func newChangeRequestCreator(client versioned.Interface, tickerInterval time.Duration, log logr.Logger) creator {
|
||||
return &changeRequestCreator{
|
||||
client: client,
|
||||
rcr_cache: cache.New(0, 24*time.Hour),
|
||||
crcr_cache: cache.New(0, 24*time.Hour),
|
||||
queue: []string{},
|
||||
mutex: &sync.RWMutex{},
|
||||
queue: []*unstructured.Unstructured{},
|
||||
tickerInterval: tickerInterval,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *changeRequestCreator) add(request *unstructured.Unstructured) {
|
||||
uid, _ := rand.Int(rand.Reader, big.NewInt(100000))
|
||||
var err error
|
||||
|
||||
switch request.GetKind() {
|
||||
case "ClusterReportChangeRequest":
|
||||
err = c.crcr_cache.Add(uid.String(), request, cache.NoExpiration)
|
||||
if err != nil {
|
||||
c.log.Error(err, "failed to add ClusterReportChangeRequest to cache, replacing", "cache length", c.crcr_cache.ItemCount())
|
||||
if err = c.crcr_cache.Replace(uid.String(), request, cache.NoExpiration); err != nil {
|
||||
c.log.Error(err, "failed to replace CRCR")
|
||||
return
|
||||
}
|
||||
}
|
||||
case "ReportChangeRequest":
|
||||
err = c.rcr_cache.Add(uid.String(), request, cache.NoExpiration)
|
||||
if err != nil {
|
||||
c.log.Error(err, "failed to add ReportChangeRequest to cache, replacing", "cache length", c.rcr_cache.ItemCount())
|
||||
if err = c.rcr_cache.Replace(uid.String(), request, cache.NoExpiration); err != nil {
|
||||
c.log.Error(err, "failed to replace RCR")
|
||||
return
|
||||
}
|
||||
}
|
||||
default:
|
||||
return
|
||||
}
|
||||
|
||||
c.mutex.Lock()
|
||||
c.queue = append(c.queue, uid.String())
|
||||
c.mutex.Unlock()
|
||||
defer c.mutex.Unlock()
|
||||
c.queue = append(c.queue, request)
|
||||
}
|
||||
|
||||
func (c *changeRequestCreator) create(request *unstructured.Unstructured) error {
|
||||
var ns string
|
||||
if request.GetKind() == "ReportChangeRequest" {
|
||||
ns = config.KyvernoNamespace()
|
||||
ns := config.KyvernoNamespace()
|
||||
rcr, err := convertToRCR(request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.client.KyvernoV1alpha2().ReportChangeRequests(ns).Create(context.TODO(), rcr, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
crcr, err := convertToCRCR(request)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = c.client.KyvernoV1alpha2().ClusterReportChangeRequests().Create(context.TODO(), crcr, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
|
@ -110,14 +66,12 @@ func (c *changeRequestCreator) create(request *unstructured.Unstructured) error
|
|||
func (c *changeRequestCreator) run(stopChan <-chan struct{}) {
|
||||
ticker := time.NewTicker(c.tickerInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
if toggle.SplitPolicyReport.Enabled() {
|
||||
err := CleanupPolicyReport(c.client)
|
||||
if err != nil {
|
||||
c.log.Error(err, "failed to delete old reports")
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
|
@ -133,7 +87,6 @@ func (c *changeRequestCreator) run(stopChan <-chan struct{}) {
|
|||
c.log.Error(err, "failed to create report change request", "req", request.Object)
|
||||
}
|
||||
}
|
||||
|
||||
c.cleanupQueue(size)
|
||||
case <-stopChan:
|
||||
return
|
||||
|
@ -144,13 +97,6 @@ func (c *changeRequestCreator) run(stopChan <-chan struct{}) {
|
|||
func (c *changeRequestCreator) cleanupQueue(size int) {
|
||||
c.mutex.Lock()
|
||||
defer c.mutex.Unlock()
|
||||
|
||||
for i := 0; i < size; i++ {
|
||||
uid := c.queue[i]
|
||||
c.crcr_cache.Delete(uid)
|
||||
c.rcr_cache.Delete(uid)
|
||||
}
|
||||
|
||||
c.queue = c.queue[size:]
|
||||
}
|
||||
|
||||
|
@ -164,67 +110,54 @@ func (c *changeRequestCreator) mergeRequests() (results []*unstructured.Unstruct
|
|||
mergedRCR := make(map[string]*unstructured.Unstructured)
|
||||
size = len(c.queue)
|
||||
|
||||
for _, uid := range c.queue {
|
||||
if unstr, ok := c.crcr_cache.Get(uid); ok {
|
||||
if crcr, ok := unstr.(*unstructured.Unstructured); ok {
|
||||
if isDeleteRequest(crcr) {
|
||||
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedCRCR)
|
||||
mergedCRCR = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
results = append(results, crcr)
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
||||
mergedCRCR = crcr
|
||||
continue
|
||||
}
|
||||
|
||||
if ok := merge(mergedCRCR, crcr); !ok {
|
||||
results = append(results, mergedCRCR)
|
||||
mergedCRCR = crcr
|
||||
}
|
||||
for _, item := range c.queue {
|
||||
switch item.GetKind() {
|
||||
case "ClusterReportChangeRequest":
|
||||
if isDeleteRequest(item) {
|
||||
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedCRCR)
|
||||
mergedCRCR = &unstructured.Unstructured{}
|
||||
}
|
||||
results = append(results, item)
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
||||
mergedCRCR = item
|
||||
continue
|
||||
}
|
||||
if ok := merge(mergedCRCR, item); !ok {
|
||||
results = append(results, mergedCRCR)
|
||||
mergedCRCR = item
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if unstr, ok := c.rcr_cache.Get(uid); ok {
|
||||
if rcr, ok := unstr.(*unstructured.Unstructured); ok {
|
||||
resourceNS := rcr.GetLabels()[ResourceLabelNamespace]
|
||||
mergedNamespacedRCR, ok := mergedRCR[resourceNS]
|
||||
if !ok {
|
||||
mergedNamespacedRCR = &unstructured.Unstructured{}
|
||||
case "ReportChangeRequest":
|
||||
resourceNS := item.GetLabels()[ResourceLabelNamespace]
|
||||
mergedNamespacedRCR, ok := mergedRCR[resourceNS]
|
||||
if !ok {
|
||||
mergedNamespacedRCR = &unstructured.Unstructured{}
|
||||
}
|
||||
if isDeleteRequest(item) {
|
||||
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[resourceNS] = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
if isDeleteRequest(rcr) {
|
||||
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[resourceNS] = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
results = append(results, rcr)
|
||||
results = append(results, item)
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
mergedRCR[resourceNS] = item
|
||||
continue
|
||||
}
|
||||
if ok := merge(mergedNamespacedRCR, item); !ok {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[resourceNS] = item
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
mergedRCR[resourceNS] = rcr
|
||||
continue
|
||||
}
|
||||
|
||||
if ok := merge(mergedNamespacedRCR, rcr); !ok {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[resourceNS] = rcr
|
||||
} else {
|
||||
mergedRCR[resourceNS] = mergedNamespacedRCR
|
||||
}
|
||||
mergedRCR[resourceNS] = mergedNamespacedRCR
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(mergedCRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedCRCR)
|
||||
}
|
||||
|
||||
for _, mergedNamespacedRCR := range mergedRCR {
|
||||
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
|
@ -243,78 +176,64 @@ func (c *changeRequestCreator) mergeRequestsPerPolicy() (results []*unstructured
|
|||
mergedRCR := make(map[string]*unstructured.Unstructured)
|
||||
size = len(c.queue)
|
||||
|
||||
for _, uid := range c.queue {
|
||||
if unstr, ok := c.crcr_cache.Get(uid); ok {
|
||||
if crcr, ok := unstr.(*unstructured.Unstructured); ok {
|
||||
policyName := crcr.GetLabels()[policyLabel]
|
||||
mergedPolicyCRCR, ok := mergedCRCR[policyName]
|
||||
if !ok {
|
||||
mergedPolicyCRCR = &unstructured.Unstructured{}
|
||||
for _, item := range c.queue {
|
||||
switch item.GetKind() {
|
||||
case "ClusterReportChangeRequest":
|
||||
policyName := item.GetLabels()[policyLabel]
|
||||
mergedPolicyCRCR, ok := mergedCRCR[policyName]
|
||||
if !ok {
|
||||
mergedPolicyCRCR = &unstructured.Unstructured{}
|
||||
}
|
||||
if isDeleteRequest(item) {
|
||||
if !reflect.DeepEqual(mergedPolicyCRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedPolicyCRCR)
|
||||
mergedCRCR[policyName] = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
if isDeleteRequest(crcr) {
|
||||
if !reflect.DeepEqual(mergedPolicyCRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedPolicyCRCR)
|
||||
mergedCRCR[policyName] = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
results = append(results, crcr)
|
||||
results = append(results, item)
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedPolicyCRCR, &unstructured.Unstructured{}) {
|
||||
mergedCRCR[policyName] = item
|
||||
continue
|
||||
}
|
||||
if ok := merge(mergedPolicyCRCR, item); !ok {
|
||||
results = append(results, mergedPolicyCRCR)
|
||||
mergedCRCR[policyName] = item
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedPolicyCRCR, &unstructured.Unstructured{}) {
|
||||
mergedCRCR[policyName] = crcr
|
||||
continue
|
||||
}
|
||||
|
||||
if ok := merge(mergedPolicyCRCR, crcr); !ok {
|
||||
results = append(results, mergedPolicyCRCR)
|
||||
mergedCRCR[policyName] = crcr
|
||||
} else {
|
||||
mergedCRCR[policyName] = mergedPolicyCRCR
|
||||
}
|
||||
mergedCRCR[policyName] = mergedPolicyCRCR
|
||||
}
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if unstr, ok := c.rcr_cache.Get(uid); ok {
|
||||
if rcr, ok := unstr.(*unstructured.Unstructured); ok {
|
||||
policyName := rcr.GetLabels()[policyLabel]
|
||||
resourceNS := rcr.GetLabels()[ResourceLabelNamespace]
|
||||
mergedNamespacedRCR, ok := mergedRCR[policyName+resourceNS]
|
||||
if !ok {
|
||||
mergedNamespacedRCR = &unstructured.Unstructured{}
|
||||
case "ReportChangeRequest":
|
||||
policyName := item.GetLabels()[policyLabel]
|
||||
resourceNS := item.GetLabels()[ResourceLabelNamespace]
|
||||
mergedNamespacedRCR, ok := mergedRCR[policyName+resourceNS]
|
||||
if !ok {
|
||||
mergedNamespacedRCR = &unstructured.Unstructured{}
|
||||
}
|
||||
if isDeleteRequest(item) {
|
||||
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[policyName+resourceNS] = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
if isDeleteRequest(rcr) {
|
||||
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[policyName+resourceNS] = &unstructured.Unstructured{}
|
||||
}
|
||||
|
||||
results = append(results, rcr)
|
||||
results = append(results, item)
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
mergedRCR[policyName+resourceNS] = item
|
||||
continue
|
||||
}
|
||||
if ok := merge(mergedNamespacedRCR, item); !ok {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[policyName+resourceNS] = item
|
||||
} else {
|
||||
if reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
mergedRCR[policyName+resourceNS] = rcr
|
||||
continue
|
||||
}
|
||||
|
||||
if ok := merge(mergedNamespacedRCR, rcr); !ok {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
mergedRCR[policyName+resourceNS] = rcr
|
||||
} else {
|
||||
mergedRCR[policyName+resourceNS] = mergedNamespacedRCR
|
||||
}
|
||||
mergedRCR[policyName+resourceNS] = mergedNamespacedRCR
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, mergedPolicyCRCR := range mergedCRCR {
|
||||
if !reflect.DeepEqual(mergedPolicyCRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedPolicyCRCR)
|
||||
}
|
||||
}
|
||||
|
||||
for _, mergedNamespacedRCR := range mergedRCR {
|
||||
if !reflect.DeepEqual(mergedNamespacedRCR, &unstructured.Unstructured{}) {
|
||||
results = append(results, mergedNamespacedRCR)
|
||||
|
@ -331,7 +250,6 @@ func merge(dst, src *unstructured.Unstructured) bool {
|
|||
if dstNS != srcNS {
|
||||
return false
|
||||
}
|
||||
|
||||
if dstResults, ok, _ := unstructured.NestedSlice(dst.UnstructuredContent(), "results"); ok {
|
||||
if srcResults, ok, _ := unstructured.NestedSlice(src.UnstructuredContent(), "results"); ok {
|
||||
dstResults = append(dstResults, srcResults...)
|
||||
|
@ -364,13 +282,11 @@ func addSummary(dst, src *unstructured.Unstructured) error {
|
|||
func isDeleteRequest(request *unstructured.Unstructured) bool {
|
||||
deleteLabels := []string{deletedLabelPolicy, deletedLabelRule}
|
||||
labels := request.GetLabels()
|
||||
|
||||
for _, l := range deleteLabels {
|
||||
if _, ok := labels[l]; ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
deleteAnnotations := []string{deletedAnnotationResourceName, deletedAnnotationResourceKind}
|
||||
annotations := request.GetAnnotations()
|
||||
for _, ann := range deleteAnnotations {
|
||||
|
@ -378,6 +294,5 @@ func isDeleteRequest(request *unstructured.Unstructured) bool {
|
|||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue