1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-05 23:46:56 +00:00
kyverno/pkg/event/controller.go

190 lines
5.8 KiB
Go
Raw Normal View History

2019-05-10 00:05:21 -07:00
package event
import (
"context"
"fmt"
"os"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
"github.com/kyverno/kyverno/pkg/metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2019-05-10 00:05:21 -07:00
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
v1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/record/util"
"k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
2019-05-10 00:05:21 -07:00
)
const (
Workers = 3
ControllerName = "kyverno-events"
workQueueRetryLimit = 3
)
// Interface to generate event
type Interface interface {
Add(infoList ...Info)
2019-05-10 00:05:21 -07:00
}
// controller generate events
type controller struct {
logger logr.Logger
eventsClient v1.EventsV1Interface
omitEvents sets.Set[string]
queue workqueue.RateLimitingInterface
clock clock.Clock
hostname string
droppedEventsCounter metric.Int64Counter
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, omitEvents ...string) *controller {
clock := clock.RealClock{}
hostname, _ := os.Hostname()
meter := otel.GetMeterProvider().Meter(metrics.MeterName)
droppedEventsCounter, err := meter.Int64Counter(
"kyverno_events_dropped",
metric.WithDescription("can be used to track the number of events dropped by the event generator"),
)
if err != nil {
logger.Error(err, "failed to register metric kyverno_events_dropped")
}
return &controller{
logger: logger,
eventsClient: eventsClient,
omitEvents: sets.New(omitEvents...),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), ControllerName),
clock: clock,
hostname: hostname,
droppedEventsCounter: droppedEventsCounter,
}
}
// Add queues an event for generation
func (gen *controller) Add(infos ...Info) {
logger := gen.logger
logger.V(3).Info("generating events", "count", len(infos))
2019-06-27 11:43:07 -07:00
for _, info := range infos {
// don't create event for resources with generateName as the name is not generated yet
if info.Regarding.Name == "" {
logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace)
continue
}
if gen.omitEvents.Has(string(info.Reason)) {
logger.V(6).Info("omitting event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
continue
}
gen.emitEvent(info)
logger.V(6).Info("creating event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
2019-06-26 18:04:50 -07:00
}
2019-05-10 00:05:21 -07:00
}
2019-08-09 13:41:56 -07:00
// Run begins generator
func (gen *controller) Run(ctx context.Context, workers int) {
logger := gen.logger
2020-03-17 11:05:20 -07:00
logger.Info("start")
defer logger.Info("terminated")
defer utilruntime.HandleCrash()
var waitGroup wait.Group
for i := 0; i < workers; i++ {
waitGroup.StartWithContext(ctx, func(ctx context.Context) {
for gen.processNextWorkItem(ctx) {
}
})
2019-05-10 00:05:21 -07:00
}
<-ctx.Done()
gen.queue.ShutDownWithDrain()
waitGroup.Wait()
}
2019-07-19 16:17:10 -07:00
func (gen *controller) processNextWorkItem(ctx context.Context) bool {
logger := gen.logger
key, quit := gen.queue.Get()
if quit {
return false
2019-05-10 00:05:21 -07:00
}
defer gen.queue.Done(key)
event, ok := key.(*eventsv1.Event)
if !ok {
logger.Error(nil, "failed to convert key to Info", "key", key)
return true
2019-07-19 16:17:10 -07:00
}
_, err := gen.eventsClient.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
if err != nil {
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
logger.Error(err, "failed to create event", "key", key)
gen.queue.AddRateLimited(key)
return true
}
gen.droppedEventsCounter.Add(ctx, 1)
logger.Error(err, "dropping event", "key", key)
2020-11-03 16:07:02 -08:00
}
gen.queue.Forget(key)
return true
2019-07-19 16:17:10 -07:00
}
func (gen *controller) emitEvent(key Info) {
logger := gen.logger
eventType := corev1.EventTypeWarning
feat: enhance global context (#9710) * feat(globalcontext): add event handling Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * feat(globalcontext): handle cache sync error Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * feat(globalcontext): ensure api is called during init Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * design(events): decouple events from policies a bit Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * feat(globalcontext): use status Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): make status optional Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): status update Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): codegen Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * chore(globalcontext): delete yaml annotations Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): fix status in tests Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcotext): update enqueue func Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): error Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * chore(globalcontext): rbac Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * chore(globalcontext): retry logic Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): unknown api call in test Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * bump Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix: set unique name for each testing resource Signed-off-by: ShutingZhao <shuting@nirmata.com> * fix: update readme Signed-off-by: ShutingZhao <shuting@nirmata.com> * fix: log msg Signed-off-by: ShutingZhao <shuting@nirmata.com> * fix: add delays Signed-off-by: ShutingZhao <shuting@nirmata.com> * fix: delay gctce creation Signed-off-by: ShutingZhao <shuting@nirmata.com> * debug: check Kyverno status Signed-off-by: ShutingZhao <shuting@nirmata.com> * debug: update chainsaw config Signed-off-by: ShutingZhao <shuting@nirmata.com> * debug: revert chainsaw config Signed-off-by: ShutingZhao <shuting@nirmata.com> * test(globalcontext): print actual status Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): add necessary delays and check status before applying Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * test(globalcontext): long refreshInterval Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * debug: log success Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * debug: print informer data Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): use client instead of informer Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * debug: print status after update Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * debug: print ResourceVersion Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * debug: remove gcecontroller from other controllers Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): update status only once Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * chore: remove excess logs Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> * fix(globalcontext): add store to cleanup controller Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> --------- Signed-off-by: Khaled Emara <khaled.emara@nirmata.com> Signed-off-by: ShutingZhao <shuting@nirmata.com> Co-authored-by: shuting <shuting@nirmata.com> Co-authored-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
2024-02-23 12:34:04 +02:00
if key.Type != "" {
eventType = key.Type
} else if key.Reason == PolicyApplied || key.Reason == PolicySkipped {
eventType = corev1.EventTypeNormal
}
timestamp := metav1.MicroTime{Time: time.Now()}
refRegarding, err := reference.GetReference(scheme.Scheme, &key.Regarding)
if err != nil {
logger.Error(err, "Could not construct reference, will not report event", "object", &key.Regarding, "eventType", eventType, "reason", string(key.Reason), "message", key.Message)
return
}
var refRelated *corev1.ObjectReference
if key.Related != nil {
refRelated, err = reference.GetReference(scheme.Scheme, key.Related)
if err != nil {
logger.V(9).Info("Could not construct reference", "object", key.Related, "err", err)
}
}
if !util.ValidateEventType(eventType) {
logger.Error(nil, "Unsupported event type", "eventType", eventType)
return
2019-05-10 00:05:21 -07:00
}
reportingController := string(key.Source)
reportingInstance := reportingController + "-" + gen.hostname
t := metav1.Time{Time: gen.clock.Now()}
namespace := refRegarding.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
event := &eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
Namespace: namespace,
},
EventTime: timestamp,
Series: nil,
ReportingController: reportingController,
ReportingInstance: reportingInstance,
Action: string(key.Action),
Reason: string(key.Reason),
Regarding: *refRegarding,
Related: refRelated,
Note: key.Message,
Type: eventType,
}
gen.queue.Add(event)
2019-05-10 00:05:21 -07:00
}