1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 10:28:36 +00:00

feat: use custom events watcher (#9324)

* feat: use cusotm events watcher

This custom Event handler solved the problem of a goroutine per Event.

Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>

* test(events): add unit test to EventGenerator

Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>

* fix(events): linter

Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>

* feat: do away with EventBroadcaster

Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>

* eddycharly fixes

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

---------

Signed-off-by: Khaled Emara <khaled.emara@nirmata.com>
Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
Co-authored-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
This commit is contained in:
Khaled Emara 2024-01-31 00:08:15 +02:00 committed by GitHub
parent 9102753323
commit 8fcd9945a1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 233 additions and 114 deletions

View file

@ -93,7 +93,6 @@ func main() {
flagset.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
flagset.StringVar(&omitEvents, "omit-events", "", "Set this flag to a comma sperated list of PolicyViolation, PolicyApplied, PolicyError, PolicySkipped to disable events, e.g. --omit-events=PolicyApplied,PolicyViolation")
flagset.Int64Var(&maxAPICallResponseLength, "maxAPICallResponseLength", 2*1000*1000, "Maximum allowed response size from API Calls. A value of 0 bypasses checks (not recommended).")
// config
appConfig := internal.NewConfiguration(
internal.WithProfiling(),
@ -116,7 +115,6 @@ func main() {
// setup
signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-background-controller", false)
defer sdown()
var err error
bgscanInterval := time.Hour
val := os.Getenv("BACKGROUND_SCAN_INTERVAL")
@ -127,23 +125,27 @@ func main() {
}
}
setup.Logger.V(2).Info("setting the background scan interval", "value", bgscanInterval.String())
// THIS IS AN UGLY FIX
// ELSE KYAML IS NOT THREAD SAFE
kyamlopenapi.Schema()
// informer factories
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
emitEventsValues := strings.Split(omitEvents, ",")
omitEventsValues := strings.Split(omitEvents, ",")
if omitEvents == "" {
emitEventsValues = []string{}
omitEventsValues = []string{}
}
var wg sync.WaitGroup
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
emitEventsValues...,
omitEventsValues...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
// this controller only subscribe to events, nothing is returned...
var wg sync.WaitGroup
policymetricscontroller.NewController(
setup.MetricsManager,
kyvernoInformer.Kyverno().V1().ClusterPolicies(),
@ -169,8 +171,6 @@ func main() {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start event generator
go eventGenerator.Run(signalCtx, event.Workers, &wg)
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
@ -221,7 +221,10 @@ func main() {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
eventController.Run(signalCtx, setup.Logger, &wg)
// start leader election
le.Run(signalCtx)
// wait for everything to shut down and exit
wg.Wait()
}

View file

@ -120,6 +120,7 @@ func main() {
// informer factories
kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod)
kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod)
var wg sync.WaitGroup
// listers
nsLister := kubeInformer.Core().V1().Namespaces().Lister()
// log policy changes
@ -139,13 +140,15 @@ func main() {
setup.EventsClient,
logging.WithName("EventGenerator"),
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
// start informers and wait for cache sync
if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kubeInformer, kyvernoInformer) {
os.Exit(1)
}
// start event generator
var wg sync.WaitGroup
go eventGenerator.Run(ctx, event.CleanupWorkers, &wg)
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
@ -331,7 +334,12 @@ func main() {
setup.Configuration,
)
// start server
server.Run(ctx.Done())
server.Run()
defer server.Stop()
// start non leader controllers
eventController.Run(ctx, setup.Logger, &wg)
// start leader election
le.Run(ctx)
// wait for everything to shut down and exit
wg.Wait()
}

View file

@ -18,9 +18,9 @@ import (
type Server interface {
// Run TLS server in separate thread and returns control immediately
Run(<-chan struct{})
Run()
// Stop TLS server and returns control after the server is shut down
Stop(context.Context)
Stop()
}
type server struct {
@ -110,7 +110,7 @@ func NewServer(
}
}
func (s *server) Run(stopCh <-chan struct{}) {
func (s *server) Run() {
go func() {
if err := s.server.ListenAndServeTLS("", ""); err != nil {
logging.Error(err, "failed to start server")
@ -118,7 +118,9 @@ func (s *server) Run(stopCh <-chan struct{}) {
}()
}
func (s *server) Stop(ctx context.Context) {
func (s *server) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
err := s.server.Shutdown(ctx)
if err != nil {
err = s.server.Close()

View file

@ -326,6 +326,11 @@ func main() {
logging.WithName("EventGenerator"),
omitEventsValues...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
// this controller only subscribe to events, nothing is returned...
policymetricscontroller.NewController(
setup.MetricsManager,
@ -390,8 +395,6 @@ func main() {
os.Exit(1)
}
}
// start event generator
go eventGenerator.Run(signalCtx, event.Workers, &wg)
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
@ -456,19 +459,6 @@ func main() {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
for _, controller := range nonLeaderControllers {
controller.Run(signalCtx, setup.Logger.WithName("controllers"), &wg)
}
// start leader election
go func() {
select {
case <-signalCtx.Done():
return
default:
le.Run(signalCtx)
}
}()
// create webhooks server
urgen := webhookgenerate.NewGenerator(
setup.KyvernoClient,
@ -532,8 +522,15 @@ func main() {
os.Exit(1)
}
// start webhooks server
server.Run(signalCtx.Done())
server.Run()
defer server.Stop()
// start non leader controllers
eventController.Run(signalCtx, setup.Logger, &wg)
for _, controller := range nonLeaderControllers {
controller.Run(signalCtx, setup.Logger.WithName("controllers"), &wg)
}
// start leader election
le.Run(signalCtx)
// wait for everything to shut down and exit
wg.Wait()
// say goodbye...
setup.Logger.V(2).Info("Kyverno shutdown successful")
}

View file

@ -259,11 +259,17 @@ func main() {
if omitEvents == "" {
omitEventsValues = []string{}
}
var wg sync.WaitGroup
eventGenerator := event.NewEventGenerator(
setup.EventsClient,
logging.WithName("EventGenerator"),
omitEventsValues...,
)
eventController := internal.NewController(
event.ControllerName,
eventGenerator,
event.Workers,
)
// engine
engine := internal.NewEngine(
ctx,
@ -284,9 +290,6 @@ func main() {
setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync")
os.Exit(1)
}
// start event generator
var wg sync.WaitGroup
go eventGenerator.Run(ctx, event.Workers, &wg)
// setup leader election
le, err := leaderelection.New(
setup.Logger.WithName("leader-election"),
@ -354,7 +357,10 @@ func main() {
setup.Logger.Error(err, "failed to initialize leader election")
os.Exit(1)
}
// start non leader controllers
eventController.Run(ctx, setup.Logger, &wg)
// start leader election
le.Run(ctx)
sdown()
// wait for everything to shut down and exit
wg.Wait()
}

View file

@ -2,66 +2,75 @@ package event
import (
"context"
"sync"
"fmt"
"os"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
"github.com/kyverno/kyverno/pkg/metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/events"
"k8s.io/klog/v2"
"k8s.io/apimachinery/pkg/util/wait"
v1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/record/util"
"k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
const (
Workers = 3
CleanupWorkers = 3
eventWorkQueueName = "kyverno-events"
ControllerName = "kyverno-events"
workQueueRetryLimit = 3
)
// generator generate events
type generator struct {
// broadcaster
broadcaster events.EventBroadcaster
// recorders
recorders map[Source]events.EventRecorder
// client
eventsClient eventsv1.EventsV1Interface
// config
omitEvents sets.Set[string]
logger logr.Logger
}
// Interface to generate event
type Interface interface {
Add(infoList ...Info)
}
// Controller interface to generate event
type Controller interface {
Interface
Run(context.Context, int, *sync.WaitGroup)
// controller generate events
type controller struct {
logger logr.Logger
eventsClient v1.EventsV1Interface
omitEvents sets.Set[string]
queue workqueue.RateLimitingInterface
clock clock.Clock
hostname string
droppedEventsCounter metric.Int64Counter
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(eventsClient eventsv1.EventsV1Interface, logger logr.Logger, omitEvents ...string) Controller {
return &generator{
broadcaster: events.NewBroadcaster(&events.EventSinkImpl{
Interface: eventsClient,
}),
eventsClient: eventsClient,
omitEvents: sets.New(omitEvents...),
logger: logger,
func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, omitEvents ...string) *controller {
clock := clock.RealClock{}
hostname, _ := os.Hostname()
meter := otel.GetMeterProvider().Meter(metrics.MeterName)
droppedEventsCounter, err := meter.Int64Counter(
"kyverno_events_dropped",
metric.WithDescription("can be used to track the number of events dropped by the event generator"),
)
if err != nil {
logger.Error(err, "failed to register metric kyverno_events_dropped")
}
return &controller{
logger: logger,
eventsClient: eventsClient,
omitEvents: sets.New(omitEvents...),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
clock: clock,
hostname: hostname,
droppedEventsCounter: droppedEventsCounter,
}
}
// Add queues an event for generation
func (gen *generator) Add(infos ...Info) {
func (gen *controller) Add(infos ...Info) {
logger := gen.logger
logger.V(3).Info("generating events", "count", len(infos))
for _, info := range infos {
@ -80,53 +89,99 @@ func (gen *generator) Add(infos ...Info) {
}
// Run begins generator
func (gen *generator) Run(ctx context.Context, workers int, waitGroup *sync.WaitGroup) {
func (gen *controller) Run(ctx context.Context, workers int) {
logger := gen.logger
logger.Info("start")
defer logger.Info("terminated")
defer utilruntime.HandleCrash()
defer gen.stopRecorders()
defer logger.Info("shutting down...")
if err := gen.startRecorders(ctx); err != nil {
logger.Error(err, "failed to start recorders")
return
var waitGroup wait.Group
for i := 0; i < workers; i++ {
waitGroup.StartWithContext(ctx, func(ctx context.Context) {
for gen.processNextWorkItem(ctx) {
}
})
}
<-ctx.Done()
gen.queue.ShutDownWithDrain()
waitGroup.Wait()
}
func (gen *generator) startRecorders(ctx context.Context) error {
if err := gen.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil {
return err
func (gen *controller) processNextWorkItem(ctx context.Context) bool {
logger := gen.logger
key, quit := gen.queue.Get()
if quit {
return false
}
logger := klog.Background().V(int(0))
// TODO: logger watcher should be stopped
if _, err := gen.broadcaster.StartLogging(logger); err != nil {
return err
defer gen.queue.Done(key)
event, ok := key.(*eventsv1.Event)
if !ok {
logger.Error(nil, "failed to convert key to Info", "key", key)
return true
}
gen.recorders = map[Source]events.EventRecorder{
PolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(PolicyController)),
AdmissionController: gen.broadcaster.NewRecorder(scheme.Scheme, string(AdmissionController)),
GeneratePolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(GeneratePolicyController)),
MutateExistingController: gen.broadcaster.NewRecorder(scheme.Scheme, string(MutateExistingController)),
CleanupController: gen.broadcaster.NewRecorder(scheme.Scheme, string(CleanupController)),
_, err := gen.eventsClient.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
if err != nil {
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
logger.Error(err, "failed to create event", "key", key)
gen.queue.AddRateLimited(key)
return true
}
gen.droppedEventsCounter.Add(ctx, 1)
logger.Error(err, "dropping event", "key", key)
}
return nil
gen.queue.Forget(key)
return true
}
func (gen *generator) stopRecorders() {
gen.broadcaster.Shutdown()
}
func (gen *generator) emitEvent(key Info) {
func (gen *controller) emitEvent(key Info) {
logger := gen.logger
eventType := corev1.EventTypeWarning
if key.Reason == PolicyApplied || key.Reason == PolicySkipped {
eventType = corev1.EventTypeNormal
}
if recorder := gen.recorders[key.Source]; recorder != nil {
logger.V(3).Info("creating the event", "source", key.Source, "type", eventType, "resource", key.Resource())
recorder.Eventf(&key.Regarding, key.Related, eventType, string(key.Reason), string(key.Action), key.Message)
} else {
logger.Info("info.source not defined for the request")
timestamp := metav1.MicroTime{Time: time.Now()}
refRegarding, err := reference.GetReference(scheme.Scheme, &key.Regarding)
if err != nil {
logger.Error(err, "Could not construct reference, will not report event", "object", &key.Regarding, "eventType", eventType, "reason", string(key.Reason), "message", key.Message)
return
}
var refRelated *corev1.ObjectReference
if key.Related != nil {
refRelated, err = reference.GetReference(scheme.Scheme, key.Related)
if err != nil {
logger.V(9).Info("Could not construct reference", "object", key.Related, "err", err)
}
}
if !util.ValidateEventType(eventType) {
logger.Error(nil, "Unsupported event type", "eventType", eventType)
return
}
reportingController := string(key.Source)
reportingInstance := reportingController + "-" + gen.hostname
t := metav1.Time{Time: gen.clock.Now()}
namespace := refRegarding.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
event := &eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
Namespace: namespace,
},
EventTime: timestamp,
Series: nil,
ReportingController: reportingController,
ReportingInstance: reportingInstance,
Action: string(key.Action),
Reason: string(key.Reason),
Regarding: *refRegarding,
Related: refRelated,
Note: key.Message,
Type: eventType,
}
gen.queue.Add(event)
}

View file

@ -0,0 +1,54 @@
package event
import (
"context"
"testing"
"time"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
)
func TestEventGenerator(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
eventCreated := make(chan struct{})
clientset := fake.NewSimpleClientset()
clientset.PrependReactor("create", "events", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
eventCreated <- struct{}{}
return true, nil, nil
})
logger := logr.Discard()
eventsClient := clientset.EventsV1()
eventGenerator := NewEventGenerator(eventsClient, logger)
go eventGenerator.Run(ctx, Workers)
time.Sleep(1 * time.Second)
info := Info{
Regarding: corev1.ObjectReference{
Kind: "Pod",
Name: "pod",
Namespace: "default",
},
Reason: "TestReason",
Action: "TestAction",
Message: "TestMessage",
Source: PolicyController,
}
eventGenerator.Add(info)
select {
case <-eventCreated:
case <-time.After(wait.ForeverTestTimeout):
t.Fatal("event not created")
}
}

View file

@ -32,7 +32,7 @@ type DebugModeOptions struct {
type Server interface {
// Run TLS server in separate thread and returns control immediately
Run(<-chan struct{})
Run()
// Stop TLS server and returns control after the server is shut down
Stop()
}
@ -201,23 +201,17 @@ func NewServer(
}
}
func (s *server) Run(stopCh <-chan struct{}) {
func (s *server) Run() {
go func() {
logger.V(3).Info("started serving requests", "addr", s.server.Addr)
if err := s.server.ListenAndServeTLS("", ""); err != http.ErrServerClosed {
logger.Error(err, "failed to listen to requests")
if err := s.server.ListenAndServeTLS("", ""); err != nil {
logging.Error(err, "failed to start server")
}
}()
logger.Info("starting service")
<-stopCh
s.Stop()
}
func (s *server) Stop() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
s.cleanup(ctx)
err := s.server.Shutdown(ctx)
if err != nil {