From 73df6a07716c8c98cb0ad4ec48ac2ae54ed38ebe Mon Sep 17 00:00:00 2001 From: Nikita Vaniasin Date: Tue, 16 Jan 2024 10:48:24 +0100 Subject: [PATCH] Change default logging level to info. Add --log.sampling (default true) (#1577) Co-authored-by: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> --- CHANGELOG.md | 1 + cmd/cmd.go | 11 +- pkg/deployment/deployment_inspector.go | 6 +- pkg/deployment/logger.go | 6 +- pkg/deployment/reconcile/action_helper.go | 5 +- .../reconcile/plan_builder_maintenance.go | 6 +- pkg/deployment/reconcile/plan_executor.go | 10 +- pkg/deployment/reconcile/reconciler.go | 5 +- .../resources/inspector/inspector.go | 6 +- pkg/deployment/resources/logger.go | 6 +- pkg/logging/level.go | 24 +++- pkg/logging/logger.go | 112 +++++++++++------- pkg/logging/sampler.go | 104 ++++++++++++++++ pkg/logging/sampler_test.go | 105 ++++++++++++++++ pkg/util/sync/map.go | 69 +++++++++++ 15 files changed, 404 insertions(+), 72 deletions(-) create mode 100644 pkg/logging/sampler.go create mode 100644 pkg/logging/sampler_test.go create mode 100644 pkg/util/sync/map.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 440cfd336..f23910225 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - (Feature) (ML) Featurization Job Type - (Bugfix) Don't abort plan in case of optional action timeout - (Documentation) Use relative links for generated docs +- (Improvement) Change default logging level to info. Add --log.sampling (default true). Adjust log levels. ## [1.2.36](https://github.com/arangodb/kube-arangodb/tree/1.2.36) (2024-01-08) - (Documentation) Improvements and fixes for rendered documentation (GH pages) diff --git a/cmd/cmd.go b/cmd/cmd.go index bb129d7c3..da8c059c9 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -75,7 +75,7 @@ const ( defaultServerPort = 8528 defaultAPIHTTPPort = 8628 defaultAPIGRPCPort = 8728 - defaultLogLevel = "debug" + defaultLogLevel = "info" defaultAdminSecretName = "arangodb-operator-dashboard" defaultAPIJWTSecretName = "arangodb-operator-api-jwt" defaultAPIJWTKeySecretName = "arangodb-operator-api-jwt-key" @@ -98,6 +98,7 @@ var ( logFormat string logLevels []string + logSampling bool serverOptions struct { host string port int @@ -191,6 +192,7 @@ func init() { f.BoolVar(&serverOptions.allowAnonymous, "server.allow-anonymous-access", false, "Allow anonymous access to the dashboard") f.StringVar(&logFormat, "log.format", "pretty", "Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used") f.StringArrayVar(&logLevels, "log.level", []string{defaultLogLevel}, fmt.Sprintf("Set log levels in format or =. Possible loggers: %s", strings.Join(logging.Global().Names(), ", "))) + f.BoolVar(&logSampling, "log.sampling", true, "If true, operator will try to minimize duplication of logging events") f.BoolVar(&apiOptions.enabled, "api.enabled", true, "Enable operator HTTP and gRPC API") f.IntVar(&apiOptions.httpPort, "api.http-port", defaultAPIHTTPPort, "HTTP API port to listen on") f.IntVar(&apiOptions.grpcPort, "api.grpc-port", defaultAPIGRPCPort, "gRPC API port to listen on") @@ -312,7 +314,10 @@ func executeMain(cmd *cobra.Command, args []string) { } else if strings.ToLower(logFormat) != "pretty" && logFormat != "" { logger.Fatal("Unknown log format: %s", logFormat) } - logging.Global().ApplyLogLevels(levels) + logging.Global().Configure(logging.Config{ + Levels: levels, + Sampling: logSampling, + }) podNameParts := strings.Split(name, "-") operatorID := podNameParts[len(podNameParts)-1] diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go index 81fb8fbc7..c68bbbade 100644 --- a/pkg/deployment/deployment_inspector.go +++ b/pkg/deployment/deployment_inspector.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -54,7 +54,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval start := time.Now() if delay := d.delayer.Wait(); delay > 0 { - log.Info().Dur("delay", delay).Msgf("Reconciliation loop execution was delayed") + log.Debug().Dur("delay", delay).Msgf("Reconciliation loop execution was delayed") } defer d.delayer.Delay(d.config.ReconciliationDelay) @@ -342,7 +342,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva } else if err, updated := d.reconciler.CreatePlan(ctx); err != nil { return minInspectionInterval, errors.Wrapf(err, "Plan creation failed") } else if updated { - d.log.Info("Plan generated, reconciling") + d.log.Debug("Plan generated, reconciling") return minInspectionInterval, nil } diff --git a/pkg/deployment/logger.go b/pkg/deployment/logger.go index 45257a13c..036601ba5 100644 --- a/pkg/deployment/logger.go +++ b/pkg/deployment/logger.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,13 +21,15 @@ package deployment import ( + "time" + "github.com/rs/zerolog" "github.com/arangodb/kube-arangodb/pkg/logging" ) var ( - logger = logging.Global().RegisterAndGetLogger("deployment", logging.Info) + logger = logging.Global().RegisterAndGetLogger("deployment", logging.Info, logging.WithSamplingPeriod(time.Second*10)) ) func (d *Deployment) sectionLogger(section string) logging.Logger { diff --git a/pkg/deployment/reconcile/action_helper.go b/pkg/deployment/reconcile/action_helper.go index 90d9db85a..7de1e8d79 100644 --- a/pkg/deployment/reconcile/action_helper.go +++ b/pkg/deployment/reconcile/action_helper.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ package reconcile import ( "context" + "time" "github.com/rs/zerolog" @@ -31,7 +32,7 @@ import ( ) var ( - logger = logging.Global().RegisterAndGetLogger("action", logging.Info) + logger = logging.Global().RegisterAndGetLogger("action", logging.Info, logging.WithSamplingPeriod(time.Second*10)) ) type actionEmpty struct { diff --git a/pkg/deployment/reconcile/plan_builder_maintenance.go b/pkg/deployment/reconcile/plan_builder_maintenance.go index 788abfe9b..58223feef 100644 --- a/pkg/deployment/reconcile/plan_builder_maintenance.go +++ b/pkg/deployment/reconcile/plan_builder_maintenance.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -146,7 +146,7 @@ func (r *Reconciler) createMaintenanceManagementPlan(ctx context.Context, apiObj } if agencyState.Target.HotBackup.Create.Exists() { - r.log.Info("HotBackup in progress") + r.log.Debug("HotBackup in progress") return nil } @@ -155,7 +155,7 @@ func (r *Reconciler) createMaintenanceManagementPlan(ctx context.Context, apiObj if (cok && c.IsTrue()) != enabled { // Condition not yet propagated - r.log.Info("Condition not yet propagated") + r.log.Str("condition", api.ConditionTypeMaintenance.String()).Debug("Condition not yet propagated") return nil } diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index c1c8ff704..5d30ba6de 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -192,7 +192,7 @@ func (d *Reconciler) executePlanStatus(ctx context.Context, pg planner) (bool, b loopStatus = d.context.GetStatus() if pg.Set(&loopStatus, newPlan) { - d.planLogger.Info("Updating plan") + d.planLogger.Debug("Updating plan") if err := d.context.UpdateStatus(ctx, loopStatus); err != nil { d.planLogger.Err(err).Debug("Failed to update CR status") return false, false, errors.WithStack(err) @@ -273,7 +273,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl if ok { c.GetThrottles().Invalidate(components...) - d.planLogger.Info("Reloading cached status") + d.planLogger.Debug("Reloading cached status") if err := c.Refresh(ctx); err != nil { d.planLogger.Err(err).Warn("Unable to reload cached status") return plan, recall, false, nil @@ -338,7 +338,7 @@ func (d *Reconciler) executeOptionalAction(ctx context.Context, planAction api.A func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, action Action) (done, abort, callAgain, retry bool, err error) { log := d.planLogger.Str("action", string(planAction.Type)).Str("member", planAction.MemberID) - log.Info("Executing action") + log.Debug("Executing action") if !planAction.IsStarted() { // Not started yet @@ -356,10 +356,10 @@ func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, a } if ready { - log.Bool("ready", ready).Info("Action Start completed") + log.Bool("ready", ready).Debug("Action Start completed") return true, false, false, false, nil } - log.Bool("ready", ready).Info("Action Started") + log.Bool("ready", ready).Debug("Action Started") return false, false, true, false, nil } diff --git a/pkg/deployment/reconcile/reconciler.go b/pkg/deployment/reconcile/reconciler.go index 6a7128c66..2e3c0bf1b 100644 --- a/pkg/deployment/reconcile/reconciler.go +++ b/pkg/deployment/reconcile/reconciler.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,7 @@ package reconcile import ( "context" + "time" "github.com/rs/zerolog" meta "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -30,7 +31,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/logging" ) -var reconcileLogger = logging.Global().RegisterAndGetLogger("deployment-reconcile", logging.Info) +var reconcileLogger = logging.Global().RegisterAndGetLogger("deployment-reconcile", logging.Info, logging.WithSamplingPeriod(time.Second*10)) // Reconciler is the service that takes care of bring the a deployment // in line with its (changed) specification. diff --git a/pkg/deployment/resources/inspector/inspector.go b/pkg/deployment/resources/inspector/inspector.go index e64b8e3fb..ed7b8bfa1 100644 --- a/pkg/deployment/resources/inspector/inspector.go +++ b/pkg/deployment/resources/inspector/inspector.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -63,8 +63,8 @@ const ( ) var ( - logger = logging.Global().RegisterAndGetLogger("inspector", logging.Info) - clientLogger = logging.Global().RegisterAndGetLogger("k8s-client", logging.Info) + logger = logging.Global().RegisterAndGetLogger("inspector", logging.Info, logging.WithSamplingPeriod(time.Second*10)) + clientLogger = logging.Global().RegisterAndGetLogger("k8s-client", logging.Info, logging.WithSamplingPeriod(time.Second*10)) ) func (i inspectorLoaders) Get(name string) int { diff --git a/pkg/deployment/resources/logger.go b/pkg/deployment/resources/logger.go index 57b246cc4..2e7b4df0b 100644 --- a/pkg/deployment/resources/logger.go +++ b/pkg/deployment/resources/logger.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,13 +21,15 @@ package resources import ( + "time" + "github.com/rs/zerolog" "github.com/arangodb/kube-arangodb/pkg/logging" ) var ( - logger = logging.Global().RegisterAndGetLogger("deployment-resources", logging.Info) + logger = logging.Global().RegisterAndGetLogger("deployment-resources", logging.Info, logging.WithSamplingPeriod(time.Second*10)) ) func (r *Resources) WrapLogger(in *zerolog.Event) *zerolog.Event { diff --git a/pkg/logging/level.go b/pkg/logging/level.go index 1faf6de94..ac7689055 100644 --- a/pkg/logging/level.go +++ b/pkg/logging/level.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -20,7 +20,9 @@ package logging -import "github.com/rs/zerolog" +import ( + "github.com/rs/zerolog" +) type Level zerolog.Level @@ -40,3 +42,21 @@ const ( func (l Level) String() string { return zerolog.Level(l).String() } + +func WithLevel(l *zerolog.Logger, level Level) *zerolog.Event { + switch level { + case Trace: + return l.Trace() + case Debug: + return l.Debug() + case Info: + return l.Info() + case Warn: + return l.Warn() + case Error: + return l.Error() + case Fatal: + return l.Fatal() + } + return nil +} diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go index 95e113a74..92bf22c30 100644 --- a/pkg/logging/logger.go +++ b/pkg/logging/logger.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -32,15 +32,21 @@ import ( const TopicAll = "all" +type Config struct { + Levels map[string]Level + Sampling bool +} + type Factory interface { - Get(name string) Logger + Get(name string, opts ...func(*LoggerOptions)) Logger LogLevels() map[string]Level + Configure(cfg Config) ApplyLogLevels(in map[string]Level) SetRoot(log zerolog.Logger) RegisterLogger(name string, level Level) bool - RegisterAndGetLogger(name string, level Level) Logger + RegisterAndGetLogger(name string, level Level, opts ...func(*LoggerOptions)) Logger RegisterWrappers(w ...Wrap) @@ -71,6 +77,8 @@ type factory struct { defaults map[string]Level levels map[string]Level + + samplingEnabled bool } func (f *factory) Names() []string { @@ -94,9 +102,19 @@ func (f *factory) RegisterWrappers(w ...Wrap) { f.wrappers = append(f.wrappers, w...) } -func (f *factory) RegisterAndGetLogger(name string, level Level) Logger { +type LoggerOptions struct { + SamplingPeriod time.Duration +} + +func WithSamplingPeriod(p time.Duration) func(o *LoggerOptions) { + return func(o *LoggerOptions) { + o.SamplingPeriod = p + } +} + +func (f *factory) RegisterAndGetLogger(name string, level Level, opts ...func(*LoggerOptions)) Logger { f.RegisterLogger(name, level) - return f.Get(name) + return f.Get(name, opts...) } func (f *factory) SetRoot(log zerolog.Logger) { @@ -111,6 +129,11 @@ func (f *factory) SetRoot(log zerolog.Logger) { } } +func (f *factory) Configure(cfg Config) { + f.samplingEnabled = cfg.Sampling + f.ApplyLogLevels(cfg.Levels) +} + func (f *factory) ApplyLogLevels(in map[string]Level) { f.lock.Lock() defer f.lock.Unlock() @@ -191,12 +214,22 @@ func (f *factory) getLogger(name string) *zerolog.Logger { return nil } -func (f *factory) Get(name string) Logger { +func (f *factory) Get(name string, opts ...func(*LoggerOptions)) Logger { + o := LoggerOptions{} + for _, f := range opts { + f(&o) + } + + var sampler Sampler + if f.samplingEnabled { + sampler = NewLogEventSampler(o.SamplingPeriod) + } return &chain{ logger: &logger{ factory: f, name: name, }, + sampler: sampler, } } @@ -260,9 +293,9 @@ type logger struct { type chain struct { *logger - parent *chain - - wrap Wrap + parent *chain + sampler Sampler + wrap Wrap } func (c *chain) TraceIO() LoggerIO { @@ -370,6 +403,25 @@ func (c *chain) Str(key, value string) Logger { return c.Wrap(Str(key, value)) } +func (c *chain) applyIfNeeded(level Level, msg string, args ...interface{}) { + l := c.factory.getLogger(c.name) + if l == nil { + return + } + + if c.sampler != nil && !c.sampler.Sample(level, msg, args) { + // skip duplicate event + return + } + + ev := WithLevel(l, level) + if ev == nil { + return + } + + c.apply(ev).Msgf(msg, args...) +} + func (c *chain) apply(in *zerolog.Event) *zerolog.Event { if p := c.parent; c.parent != nil { in = p.apply(in) @@ -394,57 +446,27 @@ func (c *chain) apply(in *zerolog.Event) *zerolog.Event { } func (c *chain) Trace(msg string, args ...interface{}) { - l := c.factory.getLogger(c.name) - if l == nil { - return - } - - c.apply(l.Trace()).Msgf(msg, args...) + c.applyIfNeeded(Trace, msg, args...) } func (c *chain) Debug(msg string, args ...interface{}) { - l := c.factory.getLogger(c.name) - if l == nil { - return - } - - c.apply(l.Debug()).Msgf(msg, args...) + c.applyIfNeeded(Debug, msg, args...) } func (c *chain) Info(msg string, args ...interface{}) { - l := c.factory.getLogger(c.name) - if l == nil { - return - } - - c.apply(l.Info()).Msgf(msg, args...) + c.applyIfNeeded(Info, msg, args...) } func (c *chain) Warn(msg string, args ...interface{}) { - l := c.factory.getLogger(c.name) - if l == nil { - return - } - - c.apply(l.Warn()).Msgf(msg, args...) + c.applyIfNeeded(Warn, msg, args...) } func (c *chain) Error(msg string, args ...interface{}) { - l := c.factory.getLogger(c.name) - if l == nil { - return - } - - c.apply(l.Error()).Msgf(msg, args...) + c.applyIfNeeded(Error, msg, args...) } func (c *chain) Fatal(msg string, args ...interface{}) { - l := c.factory.getLogger(c.name) - if l == nil { - return - } - - c.apply(l.Fatal()).Msgf(msg, args...) + c.applyIfNeeded(Fatal, msg, args...) } func (c *chain) Wrap(w Wrap) Logger { diff --git a/pkg/logging/sampler.go b/pkg/logging/sampler.go new file mode 100644 index 000000000..c6bff348d --- /dev/null +++ b/pkg/logging/sampler.go @@ -0,0 +1,104 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package logging + +import ( + "fmt" + goSync "sync" + "time" + + "github.com/arangodb/kube-arangodb/pkg/util/sync" +) + +// Sampler defines an interface to a log sampler. +type Sampler interface { + // Sample returns true if the event should be part of the sample, false if + // the event should be dropped. + Sample(level Level, msg string, args ...any) bool +} + +func NewLogEventSampler(period time.Duration) Sampler { + return &logEventSampler{ + period: period, + } +} + +type logEventSampler struct { + // period defines the burst period + period time.Duration + + // lruCache is the thread-safe map of message hash -> timestamp when event happened last time + lruCache sync.Map[string, int64] + + gcLock goSync.Mutex + gcCycle uint64 +} + +func (s *logEventSampler) Sample(level Level, format string, args ...any) bool { + if s.period <= 0 { + // sampling disabled + return true + } + + s.gc() + + hash := s.hash(level, format, args...) + return s.store(hash) +} + +func (s *logEventSampler) hash(level Level, format string, args ...any) string { + msg := fmt.Sprintf(format, args...) + return fmt.Sprintf("%s_%s", level.String(), msg) +} + +// store returns true if event hash was not stored, or it was stored more than s.period ago +func (s *logEventSampler) store(hash string) bool { + storedAt, _ := s.lruCache.LoadOrStore(hash, 0) + now := time.Now().UnixNano() + if now > storedAt { + newStoredAt := now + s.period.Nanoseconds() + return s.lruCache.CompareAndSwap(hash, storedAt, newStoredAt) + } + return false +} + +func (s *logEventSampler) gc() { + s.gcLock.Lock() + defer s.gcLock.Unlock() + + s.gcCycle++ + if s.gcCycle > 10e5 { + // run cache cleanup every 10e5 cycles + s.gcClean() + s.gcCycle = 0 + } +} + +func (s *logEventSampler) gcClean() { + now := time.Now().UnixNano() + s.lruCache.Range(func(key string, value int64) bool { + gcPeriod := (time.Minute * 15).Nanoseconds() + if now-value > gcPeriod { + s.lruCache.Delete(key) + } + return true + }) +} diff --git a/pkg/logging/sampler_test.go b/pkg/logging/sampler_test.go new file mode 100644 index 000000000..ba1bfa154 --- /dev/null +++ b/pkg/logging/sampler_test.go @@ -0,0 +1,105 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package logging + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestLogEventSampler(t *testing.T) { + const SamplePeriod = time.Millisecond * 500 + s := NewLogEventSampler(SamplePeriod) + require.True(t, s.Sample(Debug, "new-msg")) + + events := []struct { + l Level + msg string + }{ + {Debug, "test"}, + {Info, "test"}, + {Debug, "test-debug"}, + } + + trigger := func(t *testing.T, invocation int) { + for _, ev := range events { + sampled := s.Sample(ev.l, ev.msg) + if invocation == 0 { + require.True(t, sampled, invocation) + } else { + require.False(t, sampled, invocation) + } + time.Sleep(time.Millisecond) + } + } + + t.Run("sequentially", func(t *testing.T) { + for i := 0; i < 50; i++ { + trigger(t, i) + } + + time.Sleep(time.Second) + + for i := 0; i < 50; i++ { + trigger(t, i) + } + + time.Sleep(time.Second) + }) + + t.Run("parallel", func(t *testing.T) { + var wg sync.WaitGroup + // event index -> how many samples + results := map[int]*atomic.Int32{ + 0: {}, + 1: {}, + 2: {}, + } + expectedSamples := int32(2) + iters := int32(SamplePeriod/time.Millisecond)*(expectedSamples-1) + 100 + const sleep = time.Millisecond + for i, ev := range events { + wg.Add(1) + + go func(i int, l Level, msg string) { + defer wg.Done() + + for j := int32(0); j < iters; j++ { + sampled := s.Sample(l, msg) + if sampled { + results[i].Add(1) + } + + time.Sleep(sleep) + } + }(i, ev.l, ev.msg) + } + wg.Wait() + + require.Equal(t, expectedSamples, results[0].Load()) + require.Equal(t, expectedSamples, results[1].Load()) + require.Equal(t, expectedSamples, results[2].Load()) + }) +} diff --git a/pkg/util/sync/map.go b/pkg/util/sync/map.go new file mode 100644 index 000000000..d66c26295 --- /dev/null +++ b/pkg/util/sync/map.go @@ -0,0 +1,69 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package sync + +import ( + "sync" +) + +// Map is generic wrapper for sync.Map +type Map[K comparable, V any] struct { + m sync.Map +} + +func (m *Map[K, V]) Delete(key K) { + m.m.Delete(key) +} + +func (m *Map[K, V]) Load(key K) (value V, ok bool) { + v, ok := m.m.Load(key) + if !ok { + return value, ok + } + return v.(V), ok +} + +func (m *Map[K, V]) CompareAndSwap(key K, old, new V) bool { + return m.m.CompareAndSwap(key, old, new) +} + +func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) { + v, loaded := m.m.LoadAndDelete(key) + if !loaded { + return value, loaded + } + return v.(V), loaded +} + +func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) { + a, loaded := m.m.LoadOrStore(key, value) + return a.(V), loaded +} + +func (m *Map[K, V]) Range(f func(key K, value V) bool) { + m.m.Range(func(key, value any) bool { + return f(key.(K), value.(V)) + }) +} + +func (m *Map[K, V]) Store(key K, value V) { + m.m.Store(key, value) +}