1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2024-12-14 11:57:48 +00:00
kyverno/pkg/event/controller.go
Fleezesd 6b87d70b39
chore: change controller rated limiting queue (#11509)
Signed-off-by: Fleezesd <1253576349@qq.com>
Co-authored-by: shuting <shuting@nirmata.com>
2024-11-04 13:48:58 +00:00

203 lines
6.4 KiB
Go

package event
import (
"context"
"fmt"
"os"
"time"
"github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/client/clientset/versioned/scheme"
"github.com/kyverno/kyverno/pkg/metrics"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
corev1 "k8s.io/api/core/v1"
eventsv1 "k8s.io/api/events/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
v1 "k8s.io/client-go/kubernetes/typed/events/v1"
"k8s.io/client-go/tools/record/util"
"k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
"k8s.io/utils/clock"
)
const (
Workers = 3
ControllerName = "kyverno-events"
workQueueRetryLimit = 3
)
// Interface to generate event
type Interface interface {
Add(infoList ...Info)
}
// controller generate events
type controller struct {
logger logr.Logger
eventsClient v1.EventsV1Interface
omitEvents sets.Set[string]
queue workqueue.TypedRateLimitingInterface[any]
clock clock.Clock
hostname string
droppedEventsCounter metric.Int64Counter
maxQueuedEvents int
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(eventsClient v1.EventsV1Interface, logger logr.Logger, maxQueuedEvents int, omitEvents ...string) *controller {
clock := clock.RealClock{}
hostname, _ := os.Hostname()
meter := otel.GetMeterProvider().Meter(metrics.MeterName)
droppedEventsCounter, err := meter.Int64Counter(
"kyverno_events_dropped",
metric.WithDescription("can be used to track the number of events dropped by the event generator"),
)
if err != nil {
logger.Error(err, "failed to register metric kyverno_events_dropped")
}
return &controller{
logger: logger,
eventsClient: eventsClient,
omitEvents: sets.New(omitEvents...),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(
workqueue.DefaultTypedControllerRateLimiter[any](),
workqueue.TypedRateLimitingQueueConfig[any]{Name: ControllerName},
),
clock: clock,
hostname: hostname,
droppedEventsCounter: droppedEventsCounter,
maxQueuedEvents: maxQueuedEvents,
}
}
// Add queues an event for generation
func (gen *controller) Add(infos ...Info) {
logger := gen.logger
logger.V(3).Info("generating events", "count", len(infos))
if gen.maxQueuedEvents == 0 || gen.queue.Len() > gen.maxQueuedEvents {
logger.V(3).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len())
return
}
for _, info := range infos {
// don't create event for resources with generateName as the name is not generated yet
if info.Regarding.Name == "" {
logger.V(3).Info("skipping event creation for resource without a name", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace)
continue
}
if gen.omitEvents.Has(string(info.Reason)) {
logger.V(6).Info("omitting event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
continue
}
gen.emitEvent(info)
logger.V(6).Info("creating event", "kind", info.Regarding.Kind, "name", info.Regarding.Name, "namespace", info.Regarding.Namespace, "reason", info.Reason)
}
}
// Run begins generator
func (gen *controller) Run(ctx context.Context, workers int) {
logger := gen.logger
logger.Info("start")
defer logger.Info("terminated")
defer utilruntime.HandleCrash()
var waitGroup wait.Group
for i := 0; i < workers; i++ {
waitGroup.StartWithContext(ctx, func(ctx context.Context) {
for gen.processNextWorkItem(ctx) {
}
})
}
<-ctx.Done()
gen.queue.ShutDownWithDrain()
waitGroup.Wait()
}
func (gen *controller) processNextWorkItem(ctx context.Context) bool {
logger := gen.logger
key, quit := gen.queue.Get()
if quit {
return false
}
defer gen.queue.Done(key)
event, ok := key.(*eventsv1.Event)
if !ok {
logger.Error(nil, "failed to convert key to Info", "key", key)
return true
}
_, err := gen.eventsClient.Events(event.Namespace).Create(ctx, event, metav1.CreateOptions{})
if err != nil && !apierrors.IsNotFound(err) {
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
logger.Error(err, "failed to create event", "key", key)
gen.queue.AddRateLimited(key)
return true
}
gen.droppedEventsCounter.Add(ctx, 1)
logger.Error(err, "dropping event", "key", key)
}
gen.queue.Forget(key)
return true
}
func (gen *controller) emitEvent(key Info) {
logger := gen.logger
eventType := corev1.EventTypeWarning
if key.Type != "" {
eventType = key.Type
} else if key.Reason == PolicyApplied || key.Reason == PolicySkipped {
eventType = corev1.EventTypeNormal
}
timestamp := metav1.MicroTime{Time: time.Now()}
refRegarding, err := reference.GetReference(scheme.Scheme, &key.Regarding)
if err != nil {
logger.Error(err, "Could not construct reference, will not report event", "object", &key.Regarding, "eventType", eventType, "reason", string(key.Reason), "message", key.Message)
return
}
var refRelated *corev1.ObjectReference
if key.Related != nil {
refRelated, err = reference.GetReference(scheme.Scheme, key.Related)
if err != nil {
logger.V(9).Info("Could not construct reference", "object", key.Related, "err", err)
}
}
if !util.ValidateEventType(eventType) {
logger.Error(nil, "Unsupported event type", "eventType", eventType)
return
}
reportingController := string(key.Source)
reportingInstance := reportingController + "-" + gen.hostname
t := metav1.Time{Time: gen.clock.Now()}
namespace := refRegarding.Namespace
if namespace == "" {
namespace = metav1.NamespaceDefault
}
message := key.Message
if len(message) > 1024 {
message = message[0:1021] + "..."
}
event := &eventsv1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%x", refRegarding.Name, t.UnixNano()),
Namespace: namespace,
},
EventTime: timestamp,
Series: nil,
ReportingController: reportingController,
ReportingInstance: reportingInstance,
Action: string(key.Action),
Reason: string(key.Reason),
Regarding: *refRegarding,
Related: refRelated,
Note: message,
Type: eventType,
}
gen.queue.Add(event)
}