diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index c97a3c8b60..6a2368920a 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -28,7 +28,7 @@ import ( ) const ( - meterName = "kyverno" + MeterName = "kyverno" ) type MetricsConfig struct { @@ -54,7 +54,7 @@ type MetricsConfigManager interface { func (m *MetricsConfig) initializeMetrics() error { var err error - meter := global.MeterProvider().Meter(meterName) + meter := global.MeterProvider().Meter(MeterName) m.policyResultsMetric, err = meter.SyncInt64().Counter("kyverno_policy_results_total", instrument.WithDescription("can be used to track the results associated with the policies applied in the user’s cluster, at the level from rule to policy to admission requests")) if err != nil { diff --git a/pkg/utils/controller/run.go b/pkg/utils/controller/run.go index 50227b1514..bbc8734682 100644 --- a/pkg/utils/controller/run.go +++ b/pkg/utils/controller/run.go @@ -6,6 +6,11 @@ import ( "time" "github.com/go-logr/logr" + "github.com/kyverno/kyverno/pkg/metrics" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric/global" + "go.opentelemetry.io/otel/metric/instrument" + "go.opentelemetry.io/otel/metric/instrument/syncint64" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -15,11 +20,47 @@ import ( type reconcileFunc func(ctx context.Context, logger logr.Logger, key string, namespace string, name string) error +type controllerMetrics struct { + controllerName string + reconcileTotal syncint64.Counter + requeueTotal syncint64.Counter + queueDropTotal syncint64.Counter +} + +func newControllerMetrics(logger logr.Logger, controllerName string) *controllerMetrics { + meter := global.MeterProvider().Meter(metrics.MeterName) + reconcileTotal, err := meter.SyncInt64().Counter( + "kyverno_controller_reconcile_total", + instrument.WithDescription("can be used to track number of reconciliation cycles")) + if err != nil { + logger.Error(err, "Failed to create instrument, kyverno_controller_reconcile_total") + } + requeueTotal, err := meter.SyncInt64().Counter( + "kyverno_controller_requeue_total", + instrument.WithDescription("can be used to track number of reconciliation errors")) + if err != nil { + logger.Error(err, "Failed to create instrument, kyverno_controller_requeue_total") + } + queueDropTotal, err := meter.SyncInt64().Counter( + "kyverno_controller_drop_total", + instrument.WithDescription("can be used to track number of queue drops")) + if err != nil { + logger.Error(err, "Failed to create instrument, kyverno_controller_drop_total") + } + return &controllerMetrics{ + controllerName: controllerName, + reconcileTotal: reconcileTotal, + requeueTotal: requeueTotal, + queueDropTotal: queueDropTotal, + } +} + func Run(ctx context.Context, logger logr.Logger, controllerName string, period time.Duration, queue workqueue.RateLimitingInterface, n, maxRetries int, r reconcileFunc, routines ...func(context.Context, logr.Logger)) { logger.Info("starting ...") defer runtime.HandleCrash() defer logger.Info("stopped") var wg sync.WaitGroup + metric := newControllerMetrics(logger, controllerName) func() { ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -30,7 +71,7 @@ func Run(ctx context.Context, logger logr.Logger, controllerName string, period logger.Info("starting worker") defer wg.Done() defer logger.Info("worker stopped") - wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, queue, maxRetries, r) }, period) + wait.UntilWithContext(ctx, func(ctx context.Context) { worker(ctx, logger, metric, queue, maxRetries, r) }, period) }(logger.WithName("worker").WithValues("id", i)) } for i, routine := range routines { @@ -48,21 +89,24 @@ func Run(ctx context.Context, logger logr.Logger, controllerName string, period wg.Wait() } -func worker(ctx context.Context, logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) { - for processNextWorkItem(ctx, logger, queue, maxRetries, r) { +func worker(ctx context.Context, logger logr.Logger, metric *controllerMetrics, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) { + for processNextWorkItem(ctx, logger, metric, queue, maxRetries, r) { } } -func processNextWorkItem(ctx context.Context, logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool { +func processNextWorkItem(ctx context.Context, logger logr.Logger, metric *controllerMetrics, queue workqueue.RateLimitingInterface, maxRetries int, r reconcileFunc) bool { if obj, quit := queue.Get(); !quit { defer queue.Done(obj) - handleErr(logger, queue, maxRetries, reconcile(ctx, logger, obj, r), obj) + handleErr(ctx, logger, metric, queue, maxRetries, reconcile(ctx, logger, obj, r), obj) return true } return false } -func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRetries int, err error, obj interface{}) { +func handleErr(ctx context.Context, logger logr.Logger, metric *controllerMetrics, queue workqueue.RateLimitingInterface, maxRetries int, err error, obj interface{}) { + if metric.reconcileTotal != nil { + metric.reconcileTotal.Add(ctx, 1, attribute.String("controller_name", metric.controllerName)) + } if err == nil { queue.Forget(obj) } else if errors.IsNotFound(err) { @@ -71,9 +115,24 @@ func handleErr(logger logr.Logger, queue workqueue.RateLimitingInterface, maxRet } else if queue.NumRequeues(obj) < maxRetries { logger.Info("Retrying request", "obj", obj, "error", err.Error()) queue.AddRateLimited(obj) + if metric.requeueTotal != nil { + metric.requeueTotal.Add( + ctx, + 1, + attribute.String("controller_name", metric.controllerName), + attribute.Int("num_requeues", queue.NumRequeues(obj)), + ) + } } else { logger.Error(err, "Failed to process request", "obj", obj) queue.Forget(obj) + if metric.queueDropTotal != nil { + metric.queueDropTotal.Add( + ctx, + 1, + attribute.String("controller_name", metric.controllerName), + ) + } } }