diff --git a/cmd/background-controller/main.go b/cmd/background-controller/main.go index c57796200e..b92292b83c 100644 --- a/cmd/background-controller/main.go +++ b/cmd/background-controller/main.go @@ -93,7 +93,6 @@ func main() { flagset.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.") flagset.StringVar(&omitEvents, "omit-events", "", "Set this flag to a comma sperated list of PolicyViolation, PolicyApplied, PolicyError, PolicySkipped to disable events, e.g. --omit-events=PolicyApplied,PolicyViolation") flagset.Int64Var(&maxAPICallResponseLength, "maxAPICallResponseLength", 2*1000*1000, "Maximum allowed response size from API Calls. A value of 0 bypasses checks (not recommended).") - // config appConfig := internal.NewConfiguration( internal.WithProfiling(), @@ -116,7 +115,6 @@ func main() { // setup signalCtx, setup, sdown := internal.Setup(appConfig, "kyverno-background-controller", false) defer sdown() - var err error bgscanInterval := time.Hour val := os.Getenv("BACKGROUND_SCAN_INTERVAL") @@ -127,23 +125,27 @@ func main() { } } setup.Logger.V(2).Info("setting the background scan interval", "value", bgscanInterval.String()) - // THIS IS AN UGLY FIX // ELSE KYAML IS NOT THREAD SAFE kyamlopenapi.Schema() // informer factories kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod) - emitEventsValues := strings.Split(omitEvents, ",") + omitEventsValues := strings.Split(omitEvents, ",") if omitEvents == "" { - emitEventsValues = []string{} + omitEventsValues = []string{} } + var wg sync.WaitGroup eventGenerator := event.NewEventGenerator( setup.EventsClient, logging.WithName("EventGenerator"), - emitEventsValues..., + omitEventsValues..., + ) + eventController := internal.NewController( + event.ControllerName, + eventGenerator, + event.Workers, ) // this controller only subscribe to events, nothing is returned... - var wg sync.WaitGroup policymetricscontroller.NewController( setup.MetricsManager, kyvernoInformer.Kyverno().V1().ClusterPolicies(), @@ -169,8 +171,6 @@ func main() { setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") os.Exit(1) } - // start event generator - go eventGenerator.Run(signalCtx, event.Workers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), @@ -221,7 +221,10 @@ func main() { setup.Logger.Error(err, "failed to initialize leader election") os.Exit(1) } + // start non leader controllers + eventController.Run(signalCtx, setup.Logger, &wg) // start leader election le.Run(signalCtx) + // wait for everything to shut down and exit wg.Wait() } diff --git a/cmd/cleanup-controller/main.go b/cmd/cleanup-controller/main.go index 113a8e893f..9ec43650d6 100644 --- a/cmd/cleanup-controller/main.go +++ b/cmd/cleanup-controller/main.go @@ -120,6 +120,7 @@ func main() { // informer factories kubeInformer := kubeinformers.NewSharedInformerFactoryWithOptions(setup.KubeClient, resyncPeriod) kyvernoInformer := kyvernoinformer.NewSharedInformerFactory(setup.KyvernoClient, resyncPeriod) + var wg sync.WaitGroup // listers nsLister := kubeInformer.Core().V1().Namespaces().Lister() // log policy changes @@ -139,13 +140,15 @@ func main() { setup.EventsClient, logging.WithName("EventGenerator"), ) + eventController := internal.NewController( + event.ControllerName, + eventGenerator, + event.Workers, + ) // start informers and wait for cache sync if !internal.StartInformersAndWaitForCacheSync(ctx, setup.Logger, kubeInformer, kyvernoInformer) { os.Exit(1) } - // start event generator - var wg sync.WaitGroup - go eventGenerator.Run(ctx, event.CleanupWorkers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), @@ -331,7 +334,12 @@ func main() { setup.Configuration, ) // start server - server.Run(ctx.Done()) + server.Run() + defer server.Stop() + // start non leader controllers + eventController.Run(ctx, setup.Logger, &wg) // start leader election le.Run(ctx) + // wait for everything to shut down and exit + wg.Wait() } diff --git a/cmd/cleanup-controller/server.go b/cmd/cleanup-controller/server.go index 12178c087d..211d792955 100644 --- a/cmd/cleanup-controller/server.go +++ b/cmd/cleanup-controller/server.go @@ -18,9 +18,9 @@ import ( type Server interface { // Run TLS server in separate thread and returns control immediately - Run(<-chan struct{}) + Run() // Stop TLS server and returns control after the server is shut down - Stop(context.Context) + Stop() } type server struct { @@ -110,7 +110,7 @@ func NewServer( } } -func (s *server) Run(stopCh <-chan struct{}) { +func (s *server) Run() { go func() { if err := s.server.ListenAndServeTLS("", ""); err != nil { logging.Error(err, "failed to start server") @@ -118,7 +118,9 @@ func (s *server) Run(stopCh <-chan struct{}) { }() } -func (s *server) Stop(ctx context.Context) { +func (s *server) Stop() { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() err := s.server.Shutdown(ctx) if err != nil { err = s.server.Close() diff --git a/cmd/kyverno/main.go b/cmd/kyverno/main.go index 8af7959c9d..979bdf6a31 100644 --- a/cmd/kyverno/main.go +++ b/cmd/kyverno/main.go @@ -326,6 +326,11 @@ func main() { logging.WithName("EventGenerator"), omitEventsValues..., ) + eventController := internal.NewController( + event.ControllerName, + eventGenerator, + event.Workers, + ) // this controller only subscribe to events, nothing is returned... policymetricscontroller.NewController( setup.MetricsManager, @@ -390,8 +395,6 @@ func main() { os.Exit(1) } } - // start event generator - go eventGenerator.Run(signalCtx, event.Workers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), @@ -456,19 +459,6 @@ func main() { setup.Logger.Error(err, "failed to initialize leader election") os.Exit(1) } - // start non leader controllers - for _, controller := range nonLeaderControllers { - controller.Run(signalCtx, setup.Logger.WithName("controllers"), &wg) - } - // start leader election - go func() { - select { - case <-signalCtx.Done(): - return - default: - le.Run(signalCtx) - } - }() // create webhooks server urgen := webhookgenerate.NewGenerator( setup.KyvernoClient, @@ -532,8 +522,15 @@ func main() { os.Exit(1) } // start webhooks server - server.Run(signalCtx.Done()) + server.Run() + defer server.Stop() + // start non leader controllers + eventController.Run(signalCtx, setup.Logger, &wg) + for _, controller := range nonLeaderControllers { + controller.Run(signalCtx, setup.Logger.WithName("controllers"), &wg) + } + // start leader election + le.Run(signalCtx) + // wait for everything to shut down and exit wg.Wait() - // say goodbye... - setup.Logger.V(2).Info("Kyverno shutdown successful") } diff --git a/cmd/reports-controller/main.go b/cmd/reports-controller/main.go index d6fe971814..228987dd22 100644 --- a/cmd/reports-controller/main.go +++ b/cmd/reports-controller/main.go @@ -259,11 +259,17 @@ func main() { if omitEvents == "" { omitEventsValues = []string{} } + var wg sync.WaitGroup eventGenerator := event.NewEventGenerator( setup.EventsClient, logging.WithName("EventGenerator"), omitEventsValues..., ) + eventController := internal.NewController( + event.ControllerName, + eventGenerator, + event.Workers, + ) // engine engine := internal.NewEngine( ctx, @@ -284,9 +290,6 @@ func main() { setup.Logger.Error(errors.New("failed to wait for cache sync"), "failed to wait for cache sync") os.Exit(1) } - // start event generator - var wg sync.WaitGroup - go eventGenerator.Run(ctx, event.Workers, &wg) // setup leader election le, err := leaderelection.New( setup.Logger.WithName("leader-election"), @@ -354,7 +357,10 @@ func main() { setup.Logger.Error(err, "failed to initialize leader election") os.Exit(1) } + // start non leader controllers + eventController.Run(ctx, setup.Logger, &wg) + // start leader election le.Run(ctx) - sdown() + // wait for everything to shut down and exit wg.Wait() } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index 8606ca6b70..83c131c3c4 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -2,66 +2,75 @@ package event import ( "context" - "sync" + "fmt" + "os" + "time" "github.com/go-logr/logr" "github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme" + "github.com/kyverno/kyverno/pkg/metrics" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/metric" corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" - eventsv1 "k8s.io/client-go/kubernetes/typed/events/v1" - "k8s.io/client-go/tools/events" - "k8s.io/klog/v2" + "k8s.io/apimachinery/pkg/util/wait" + v1 "k8s.io/client-go/kubernetes/typed/events/v1" + "k8s.io/client-go/tools/record/util" + "k8s.io/client-go/tools/reference" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/clock" ) const ( Workers = 3 - CleanupWorkers = 3 - eventWorkQueueName = "kyverno-events" + ControllerName = "kyverno-events" workQueueRetryLimit = 3 ) -// generator generate events -type generator struct { - // broadcaster - broadcaster events.EventBroadcaster - - // recorders - recorders map[Source]events.EventRecorder - - // client - eventsClient eventsv1.EventsV1Interface - - // config - omitEvents sets.Set[string] - logger logr.Logger -} - // Interface to generate event type Interface interface { Add(infoList ...Info) } -// Controller interface to generate event -type Controller interface { - Interface - Run(context.Context, int, *sync.WaitGroup) +// controller generate events +type controller struct { + logger logr.Logger + eventsClient v1.EventsV1Interface + omitEvents sets.Set[string] + queue workqueue.RateLimitingInterface + clock clock.Clock + hostname string + droppedEventsCounter metric.Int64Counter } // NewEventGenerator to generate a new event controller -func NewEventGenerator(eventsClient eventsv1.EventsV1Interface, logger logr.Logger, omitEvents ...string) Controller { - return &generator{ - broadcaster: events.NewBroadcaster(&events.EventSinkImpl{ - Interface: eventsClient, - }), - eventsClient: eventsClient, - omitEvents: sets.New(omitEvents...), - logger: logger, +func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, omitEvents ...string) *controller { + clock := clock.RealClock{} + hostname, _ := os.Hostname() + meter := otel.GetMeterProvider().Meter(metrics.MeterName) + droppedEventsCounter, err := meter.Int64Counter( + "kyverno_events_dropped", + metric.WithDescription("can be used to track the number of events dropped by the event generator"), + ) + if err != nil { + logger.Error(err, "failed to register metric kyverno_events_dropped") + } + return &controller{ + logger: logger, + eventsClient: eventsClient, + omitEvents: sets.New(omitEvents...), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName), + clock: clock, + hostname: hostname, + droppedEventsCounter: droppedEventsCounter, } } // Add queues an event for generation -func (gen *generator) Add(infos ...Info) { +func (gen *controller) Add(infos ...Info) { logger := gen.logger logger.V(3).Info("generating events", "count", len(infos)) for _, info := range infos { @@ -80,53 +89,99 @@ func (gen *generator) Add(infos ...Info) { } // Run begins generator -func (gen *generator) Run(ctx context.Context, workers int, waitGroup *sync.WaitGroup) { +func (gen *controller) Run(ctx context.Context, workers int) { logger := gen.logger logger.Info("start") defer logger.Info("terminated") defer utilruntime.HandleCrash() - defer gen.stopRecorders() - defer logger.Info("shutting down...") - if err := gen.startRecorders(ctx); err != nil { - logger.Error(err, "failed to start recorders") - return + var waitGroup wait.Group + for i := 0; i < workers; i++ { + waitGroup.StartWithContext(ctx, func(ctx context.Context) { + for gen.processNextWorkItem(ctx) { + } + }) } <-ctx.Done() + gen.queue.ShutDownWithDrain() + waitGroup.Wait() } -func (gen *generator) startRecorders(ctx context.Context) error { - if err := gen.broadcaster.StartRecordingToSinkWithContext(ctx); err != nil { - return err +func (gen *controller) processNextWorkItem(ctx context.Context) bool { + logger := gen.logger + key, quit := gen.queue.Get() + if quit { + return false } - logger := klog.Background().V(int(0)) - // TODO: logger watcher should be stopped - if _, err := gen.broadcaster.StartLogging(logger); err != nil { - return err + defer gen.queue.Done(key) + event, ok := key.(*eventsv1.Event) + if !ok { + logger.Error(nil, "failed to convert key to Info", "key", key) + return true } - gen.recorders = map[Source]events.EventRecorder{ - PolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(PolicyController)), - AdmissionController: gen.broadcaster.NewRecorder(scheme.Scheme, string(AdmissionController)), - GeneratePolicyController: gen.broadcaster.NewRecorder(scheme.Scheme, string(GeneratePolicyController)), - MutateExistingController: gen.broadcaster.NewRecorder(scheme.Scheme, string(MutateExistingController)), - CleanupController: gen.broadcaster.NewRecorder(scheme.Scheme, string(CleanupController)), + _, err := gen.eventsClient.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{}) + if err != nil { + if gen.queue.NumRequeues(key) < workQueueRetryLimit { + logger.Error(err, "failed to create event", "key", key) + gen.queue.AddRateLimited(key) + return true + } + gen.droppedEventsCounter.Add(ctx, 1) + logger.Error(err, "dropping event", "key", key) } - return nil + gen.queue.Forget(key) + return true } -func (gen *generator) stopRecorders() { - gen.broadcaster.Shutdown() -} - -func (gen *generator) emitEvent(key Info) { +func (gen *controller) emitEvent(key Info) { logger := gen.logger eventType := corev1.EventTypeWarning if key.Reason == PolicyApplied || key.Reason == PolicySkipped { eventType = corev1.EventTypeNormal } - if recorder := gen.recorders[key.Source]; recorder != nil { - logger.V(3).Info("creating the event", "source", key.Source, "type", eventType, "resource", key.Resource()) - recorder.Eventf(&key.Regarding, key.Related, eventType, string(key.Reason), string(key.Action), key.Message) - } else { - logger.Info("info.source not defined for the request") + + timestamp := metav1.MicroTime{Time: time.Now()} + refRegarding, err := reference.GetReference(scheme.Scheme, &key.Regarding) + if err != nil { + logger.Error(err, "Could not construct reference, will not report event", "object", &key.Regarding, "eventType", eventType, "reason", string(key.Reason), "message", key.Message) + return } + + var refRelated *corev1.ObjectReference + if key.Related != nil { + refRelated, err = reference.GetReference(scheme.Scheme, key.Related) + if err != nil { + logger.V(9).Info("Could not construct reference", "object", key.Related, "err", err) + } + } + if !util.ValidateEventType(eventType) { + logger.Error(nil, "Unsupported event type", "eventType", eventType) + return + } + + reportingController := string(key.Source) + reportingInstance := reportingController + "-" + gen.hostname + + t := metav1.Time{Time: gen.clock.Now()} + namespace := refRegarding.Namespace + if namespace == "" { + namespace = metav1.NamespaceDefault + } + event := &eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()), + Namespace: namespace, + }, + EventTime: timestamp, + Series: nil, + ReportingController: reportingController, + ReportingInstance: reportingInstance, + Action: string(key.Action), + Reason: string(key.Reason), + Regarding: *refRegarding, + Related: refRelated, + Note: key.Message, + Type: eventType, + } + + gen.queue.Add(event) } diff --git a/pkg/event/controller_test.go b/pkg/event/controller_test.go new file mode 100644 index 0000000000..0a4e5715bb --- /dev/null +++ b/pkg/event/controller_test.go @@ -0,0 +1,54 @@ +package event + +import ( + "context" + "testing" + "time" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/fake" + clienttesting "k8s.io/client-go/testing" +) + +func TestEventGenerator(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + eventCreated := make(chan struct{}) + clientset := fake.NewSimpleClientset() + clientset.PrependReactor("create", "events", func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { + eventCreated <- struct{}{} + return true, nil, nil + }) + + logger := logr.Discard() + + eventsClient := clientset.EventsV1() + eventGenerator := NewEventGenerator(eventsClient, logger) + + go eventGenerator.Run(ctx, Workers) + time.Sleep(1 * time.Second) + + info := Info{ + Regarding: corev1.ObjectReference{ + Kind: "Pod", + Name: "pod", + Namespace: "default", + }, + Reason: "TestReason", + Action: "TestAction", + Message: "TestMessage", + Source: PolicyController, + } + + eventGenerator.Add(info) + + select { + case <-eventCreated: + case <-time.After(wait.ForeverTestTimeout): + t.Fatal("event not created") + } +} diff --git a/pkg/webhooks/server.go b/pkg/webhooks/server.go index 64bcac7f57..0e8a9e56a9 100644 --- a/pkg/webhooks/server.go +++ b/pkg/webhooks/server.go @@ -32,7 +32,7 @@ type DebugModeOptions struct { type Server interface { // Run TLS server in separate thread and returns control immediately - Run(<-chan struct{}) + Run() // Stop TLS server and returns control after the server is shut down Stop() } @@ -201,23 +201,17 @@ func NewServer( } } -func (s *server) Run(stopCh <-chan struct{}) { +func (s *server) Run() { go func() { - logger.V(3).Info("started serving requests", "addr", s.server.Addr) - if err := s.server.ListenAndServeTLS("", ""); err != http.ErrServerClosed { - logger.Error(err, "failed to listen to requests") + if err := s.server.ListenAndServeTLS("", ""); err != nil { + logging.Error(err, "failed to start server") } }() - logger.Info("starting service") - - <-stopCh - s.Stop() } func (s *server) Stop() { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - s.cleanup(ctx) err := s.server.Shutdown(ctx) if err != nil {