mirror of
https://github.com/kyverno/policy-reporter.git
synced 2024-12-14 11:57:32 +00:00
fix: simplify cache management (#261)
* fix: simplify cache management Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
parent
580fca1ac6
commit
8ed04abe48
8 changed files with 43 additions and 73 deletions
7
pkg/cache/cache.go
vendored
7
pkg/cache/cache.go
vendored
|
@ -7,10 +7,3 @@ type Cache interface {
|
|||
AddReport(report v1alpha2.ReportInterface)
|
||||
GetResults(id string) []string
|
||||
}
|
||||
|
||||
type ItemCache interface {
|
||||
Cache
|
||||
AddItem(key string, value interface{})
|
||||
GetItem(key string) (interface{}, bool)
|
||||
RemoveItem(key string)
|
||||
}
|
||||
|
|
21
pkg/cache/memory.go
vendored
21
pkg/cache/memory.go
vendored
|
@ -13,12 +13,7 @@ type inMemoryCache struct {
|
|||
}
|
||||
|
||||
func (c *inMemoryCache) AddReport(report v1alpha2.ReportInterface) {
|
||||
list := make([]string, 0, len(report.GetResults()))
|
||||
for _, result := range report.GetResults() {
|
||||
list = append(list, result.GetID())
|
||||
}
|
||||
|
||||
c.cache.Set(report.GetID(), list, gocache.NoExpiration)
|
||||
c.cache.Set(report.GetID(), reportResultsIds(report), gocache.NoExpiration)
|
||||
}
|
||||
|
||||
func (c *inMemoryCache) RemoveReport(id string) {
|
||||
|
@ -38,19 +33,7 @@ func (c *inMemoryCache) GetResults(id string) []string {
|
|||
return list.([]string)
|
||||
}
|
||||
|
||||
func (c *inMemoryCache) AddItem(key string, value interface{}) {
|
||||
c.cache.Set(key, value, gocache.NoExpiration)
|
||||
}
|
||||
|
||||
func (c *inMemoryCache) RemoveItem(key string) {
|
||||
c.cache.Delete(key)
|
||||
}
|
||||
|
||||
func (c *inMemoryCache) GetItem(key string) (interface{}, bool) {
|
||||
return c.cache.Get(key)
|
||||
}
|
||||
|
||||
func NewInMermoryCache() ItemCache {
|
||||
func NewInMermoryCache() Cache {
|
||||
return &inMemoryCache{
|
||||
cache: gocache.New(gocache.NoExpiration, 5*time.Minute),
|
||||
}
|
||||
|
|
5
pkg/cache/redis.go
vendored
5
pkg/cache/redis.go
vendored
|
@ -19,10 +19,7 @@ type redisCache struct {
|
|||
}
|
||||
|
||||
func (r *redisCache) AddReport(report v1alpha2.ReportInterface) {
|
||||
list := make([]string, 0, len(report.GetResults()))
|
||||
for _, result := range report.GetResults() {
|
||||
list = append(list, result.GetID())
|
||||
}
|
||||
list := reportResultsIds(report)
|
||||
|
||||
value, _ := json.Marshal(list)
|
||||
|
||||
|
|
13
pkg/cache/utils.go
vendored
Normal file
13
pkg/cache/utils.go
vendored
Normal file
|
@ -0,0 +1,13 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"github.com/kyverno/policy-reporter/pkg/crd/api/policyreport/v1alpha2"
|
||||
)
|
||||
|
||||
func reportResultsIds(report v1alpha2.ReportInterface) []string {
|
||||
list := make([]string, 0, len(report.GetResults()))
|
||||
for _, result := range report.GetResults() {
|
||||
list = append(list, result.GetID())
|
||||
}
|
||||
return list
|
||||
}
|
|
@ -41,7 +41,6 @@ type Resolver struct {
|
|||
leaderElector *leaderelection.Client
|
||||
targetClients []target.Client
|
||||
resultCache cache.Cache
|
||||
cache cache.ItemCache
|
||||
targetsCreated bool
|
||||
}
|
||||
|
||||
|
@ -116,7 +115,6 @@ func (r *Resolver) Queue() (*kubernetes.Queue, error) {
|
|||
}
|
||||
|
||||
return kubernetes.NewQueue(
|
||||
r.InMemoryCache(),
|
||||
kubernetes.NewDebouncer(1*time.Minute, r.EventPublisher()),
|
||||
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "report-queue"),
|
||||
client,
|
||||
|
@ -334,16 +332,6 @@ func (r *Resolver) ReportFilter() *report.Filter {
|
|||
)
|
||||
}
|
||||
|
||||
func (r *Resolver) InMemoryCache() cache.ItemCache {
|
||||
if r.cache != nil {
|
||||
return r.cache
|
||||
}
|
||||
|
||||
r.cache = cache.NewInMermoryCache()
|
||||
|
||||
return r.cache
|
||||
}
|
||||
|
||||
// ResultCache resolver method
|
||||
func (r *Resolver) ResultCache() cache.Cache {
|
||||
if r.resultCache != nil {
|
||||
|
@ -362,7 +350,7 @@ func (r *Resolver) ResultCache() cache.Cache {
|
|||
2*time.Hour,
|
||||
)
|
||||
} else {
|
||||
r.resultCache = r.InMemoryCache()
|
||||
r.resultCache = cache.NewInMermoryCache()
|
||||
}
|
||||
|
||||
return r.resultCache
|
||||
|
|
|
@ -269,20 +269,6 @@ func Test_ResolveMapper(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func Test_ResolveInMemoryCache(t *testing.T) {
|
||||
resolver := config.NewResolver(testConfig, &rest.Config{})
|
||||
|
||||
cache1 := resolver.InMemoryCache()
|
||||
if cache1 == nil {
|
||||
t.Error("Error: Should return InMemoryCache")
|
||||
}
|
||||
|
||||
cache2 := resolver.InMemoryCache()
|
||||
if cache1 != cache2 {
|
||||
t.Error("A second call resolver.InMemoryCache() should return the cached first cache")
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ResolveReportFilter(t *testing.T) {
|
||||
resolver := config.NewResolver(testConfig, &rest.Config{})
|
||||
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"github.com/kyverno/policy-reporter/pkg/cache"
|
||||
"github.com/kyverno/policy-reporter/pkg/fixtures"
|
||||
"github.com/kyverno/policy-reporter/pkg/kubernetes"
|
||||
"github.com/kyverno/policy-reporter/pkg/report"
|
||||
|
@ -36,7 +35,6 @@ func Test_PolicyReportWatcher(t *testing.T) {
|
|||
restClient, polrClient, _ := NewFakeClient()
|
||||
|
||||
queue := kubernetes.NewQueue(
|
||||
cache.NewInMermoryCache(),
|
||||
kubernetes.NewDebouncer(0, publisher),
|
||||
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-queue"),
|
||||
restClient.Wgpolicyk8sV1alpha2(),
|
||||
|
@ -87,7 +85,6 @@ func Test_ClusterPolicyReportWatcher(t *testing.T) {
|
|||
restClient, _, polrClient := NewFakeClient()
|
||||
|
||||
queue := kubernetes.NewQueue(
|
||||
cache.NewInMermoryCache(),
|
||||
kubernetes.NewDebouncer(0, publisher),
|
||||
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-queue"),
|
||||
restClient.Wgpolicyk8sV1alpha2(),
|
||||
|
@ -128,7 +125,6 @@ func Test_HasSynced(t *testing.T) {
|
|||
restClient, _, _ := NewFakeClient()
|
||||
|
||||
queue := kubernetes.NewQueue(
|
||||
cache.NewInMermoryCache(),
|
||||
kubernetes.NewDebouncer(0, report.NewEventPublisher()),
|
||||
workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-queue"),
|
||||
restClient.Wgpolicyk8sV1alpha2(),
|
||||
|
|
|
@ -3,6 +3,7 @@ package kubernetes
|
|||
import (
|
||||
"context"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
|
@ -15,13 +16,15 @@ import (
|
|||
pr "github.com/kyverno/policy-reporter/pkg/crd/api/policyreport/v1alpha2"
|
||||
"github.com/kyverno/policy-reporter/pkg/crd/client/clientset/versioned/typed/policyreport/v1alpha2"
|
||||
"github.com/kyverno/policy-reporter/pkg/report"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
queue workqueue.RateLimitingInterface
|
||||
client v1alpha2.Wgpolicyk8sV1alpha2Interface
|
||||
debouncer Debouncer
|
||||
cache Cache
|
||||
lock *sync.Mutex
|
||||
cache sets.Set[string]
|
||||
}
|
||||
|
||||
func (q *Queue) Add(obj *v1.PartialObjectMetadata) error {
|
||||
|
@ -53,14 +56,14 @@ func (q *Queue) runWorker() {
|
|||
}
|
||||
|
||||
func (q *Queue) processNextItem() bool {
|
||||
key, quit := q.queue.Get()
|
||||
obj, quit := q.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
||||
key := obj.(string)
|
||||
defer q.queue.Done(key)
|
||||
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key.(string))
|
||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||
if err != nil {
|
||||
q.queue.Forget(key)
|
||||
return true
|
||||
|
@ -90,21 +93,31 @@ func (q *Queue) processNextItem() bool {
|
|||
}
|
||||
}
|
||||
|
||||
func() {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
q.cache.Delete(key)
|
||||
}()
|
||||
q.debouncer.Add(report.LifecycleEvent{Type: report.Deleted, PolicyReport: polr})
|
||||
q.cache.RemoveItem(key.(string))
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
event := report.Added
|
||||
if _, ok := q.cache.GetItem(key.(string)); ok {
|
||||
event = report.Updated
|
||||
}
|
||||
event := func() report.Event {
|
||||
q.lock.Lock()
|
||||
defer q.lock.Unlock()
|
||||
event := report.Added
|
||||
if q.cache.Has(key) {
|
||||
event = report.Updated
|
||||
} else {
|
||||
q.cache.Insert(key)
|
||||
}
|
||||
return event
|
||||
}()
|
||||
|
||||
q.handleErr(err, key)
|
||||
|
||||
q.debouncer.Add(report.LifecycleEvent{Type: event, PolicyReport: polr})
|
||||
q.cache.AddItem(key.(string), nil)
|
||||
|
||||
return true
|
||||
}
|
||||
|
@ -128,11 +141,12 @@ func (q *Queue) handleErr(err error, key interface{}) {
|
|||
log.Printf("[WARNING] Dropping report %q out of the queue: %v", key, err)
|
||||
}
|
||||
|
||||
func NewQueue(cache Cache, debouncer Debouncer, queue workqueue.RateLimitingInterface, client v1alpha2.Wgpolicyk8sV1alpha2Interface) *Queue {
|
||||
func NewQueue(debouncer Debouncer, queue workqueue.RateLimitingInterface, client v1alpha2.Wgpolicyk8sV1alpha2Interface) *Queue {
|
||||
return &Queue{
|
||||
debouncer: debouncer,
|
||||
queue: queue,
|
||||
client: client,
|
||||
cache: cache,
|
||||
cache: sets.New[string](),
|
||||
lock: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue