1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Change default logging level to info. Add --log.sampling (default true) (#1577)

Co-authored-by: Adam Janikowski <12255597+ajanikow@users.noreply.github.com>
This commit is contained in:
Nikita Vaniasin 2024-01-16 10:48:24 +01:00 committed by GitHub
parent abd8562438
commit 73df6a0771
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 404 additions and 72 deletions

View file

@ -7,6 +7,7 @@
- (Feature) (ML) Featurization Job Type - (Feature) (ML) Featurization Job Type
- (Bugfix) Don't abort plan in case of optional action timeout - (Bugfix) Don't abort plan in case of optional action timeout
- (Documentation) Use relative links for generated docs - (Documentation) Use relative links for generated docs
- (Improvement) Change default logging level to info. Add --log.sampling (default true). Adjust log levels.
## [1.2.36](https://github.com/arangodb/kube-arangodb/tree/1.2.36) (2024-01-08) ## [1.2.36](https://github.com/arangodb/kube-arangodb/tree/1.2.36) (2024-01-08)
- (Documentation) Improvements and fixes for rendered documentation (GH pages) - (Documentation) Improvements and fixes for rendered documentation (GH pages)

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -75,7 +75,7 @@ const (
defaultServerPort = 8528 defaultServerPort = 8528
defaultAPIHTTPPort = 8628 defaultAPIHTTPPort = 8628
defaultAPIGRPCPort = 8728 defaultAPIGRPCPort = 8728
defaultLogLevel = "debug" defaultLogLevel = "info"
defaultAdminSecretName = "arangodb-operator-dashboard" defaultAdminSecretName = "arangodb-operator-dashboard"
defaultAPIJWTSecretName = "arangodb-operator-api-jwt" defaultAPIJWTSecretName = "arangodb-operator-api-jwt"
defaultAPIJWTKeySecretName = "arangodb-operator-api-jwt-key" defaultAPIJWTKeySecretName = "arangodb-operator-api-jwt-key"
@ -98,6 +98,7 @@ var (
logFormat string logFormat string
logLevels []string logLevels []string
logSampling bool
serverOptions struct { serverOptions struct {
host string host string
port int port int
@ -191,6 +192,7 @@ func init() {
f.BoolVar(&serverOptions.allowAnonymous, "server.allow-anonymous-access", false, "Allow anonymous access to the dashboard") f.BoolVar(&serverOptions.allowAnonymous, "server.allow-anonymous-access", false, "Allow anonymous access to the dashboard")
f.StringVar(&logFormat, "log.format", "pretty", "Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used") f.StringVar(&logFormat, "log.format", "pretty", "Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used")
f.StringArrayVar(&logLevels, "log.level", []string{defaultLogLevel}, fmt.Sprintf("Set log levels in format <level> or <logger>=<level>. Possible loggers: %s", strings.Join(logging.Global().Names(), ", "))) f.StringArrayVar(&logLevels, "log.level", []string{defaultLogLevel}, fmt.Sprintf("Set log levels in format <level> or <logger>=<level>. Possible loggers: %s", strings.Join(logging.Global().Names(), ", ")))
f.BoolVar(&logSampling, "log.sampling", true, "If true, operator will try to minimize duplication of logging events")
f.BoolVar(&apiOptions.enabled, "api.enabled", true, "Enable operator HTTP and gRPC API") f.BoolVar(&apiOptions.enabled, "api.enabled", true, "Enable operator HTTP and gRPC API")
f.IntVar(&apiOptions.httpPort, "api.http-port", defaultAPIHTTPPort, "HTTP API port to listen on") f.IntVar(&apiOptions.httpPort, "api.http-port", defaultAPIHTTPPort, "HTTP API port to listen on")
f.IntVar(&apiOptions.grpcPort, "api.grpc-port", defaultAPIGRPCPort, "gRPC API port to listen on") f.IntVar(&apiOptions.grpcPort, "api.grpc-port", defaultAPIGRPCPort, "gRPC API port to listen on")
@ -312,7 +314,10 @@ func executeMain(cmd *cobra.Command, args []string) {
} else if strings.ToLower(logFormat) != "pretty" && logFormat != "" { } else if strings.ToLower(logFormat) != "pretty" && logFormat != "" {
logger.Fatal("Unknown log format: %s", logFormat) logger.Fatal("Unknown log format: %s", logFormat)
} }
logging.Global().ApplyLogLevels(levels) logging.Global().Configure(logging.Config{
Levels: levels,
Sampling: logSampling,
})
podNameParts := strings.Split(name, "-") podNameParts := strings.Split(name, "-")
operatorID := podNameParts[len(podNameParts)-1] operatorID := podNameParts[len(podNameParts)-1]

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -54,7 +54,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
start := time.Now() start := time.Now()
if delay := d.delayer.Wait(); delay > 0 { if delay := d.delayer.Wait(); delay > 0 {
log.Info().Dur("delay", delay).Msgf("Reconciliation loop execution was delayed") log.Debug().Dur("delay", delay).Msgf("Reconciliation loop execution was delayed")
} }
defer d.delayer.Delay(d.config.ReconciliationDelay) defer d.delayer.Delay(d.config.ReconciliationDelay)
@ -342,7 +342,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
} else if err, updated := d.reconciler.CreatePlan(ctx); err != nil { } else if err, updated := d.reconciler.CreatePlan(ctx); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Plan creation failed") return minInspectionInterval, errors.Wrapf(err, "Plan creation failed")
} else if updated { } else if updated {
d.log.Info("Plan generated, reconciling") d.log.Debug("Plan generated, reconciling")
return minInspectionInterval, nil return minInspectionInterval, nil
} }

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -21,13 +21,15 @@
package deployment package deployment
import ( import (
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/logging"
) )
var ( var (
logger = logging.Global().RegisterAndGetLogger("deployment", logging.Info) logger = logging.Global().RegisterAndGetLogger("deployment", logging.Info, logging.WithSamplingPeriod(time.Second*10))
) )
func (d *Deployment) sectionLogger(section string) logging.Logger { func (d *Deployment) sectionLogger(section string) logging.Logger {

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ package reconcile
import ( import (
"context" "context"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
@ -31,7 +32,7 @@ import (
) )
var ( var (
logger = logging.Global().RegisterAndGetLogger("action", logging.Info) logger = logging.Global().RegisterAndGetLogger("action", logging.Info, logging.WithSamplingPeriod(time.Second*10))
) )
type actionEmpty struct { type actionEmpty struct {

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -146,7 +146,7 @@ func (r *Reconciler) createMaintenanceManagementPlan(ctx context.Context, apiObj
} }
if agencyState.Target.HotBackup.Create.Exists() { if agencyState.Target.HotBackup.Create.Exists() {
r.log.Info("HotBackup in progress") r.log.Debug("HotBackup in progress")
return nil return nil
} }
@ -155,7 +155,7 @@ func (r *Reconciler) createMaintenanceManagementPlan(ctx context.Context, apiObj
if (cok && c.IsTrue()) != enabled { if (cok && c.IsTrue()) != enabled {
// Condition not yet propagated // Condition not yet propagated
r.log.Info("Condition not yet propagated") r.log.Str("condition", api.ConditionTypeMaintenance.String()).Debug("Condition not yet propagated")
return nil return nil
} }

View file

@ -192,7 +192,7 @@ func (d *Reconciler) executePlanStatus(ctx context.Context, pg planner) (bool, b
loopStatus = d.context.GetStatus() loopStatus = d.context.GetStatus()
if pg.Set(&loopStatus, newPlan) { if pg.Set(&loopStatus, newPlan) {
d.planLogger.Info("Updating plan") d.planLogger.Debug("Updating plan")
if err := d.context.UpdateStatus(ctx, loopStatus); err != nil { if err := d.context.UpdateStatus(ctx, loopStatus); err != nil {
d.planLogger.Err(err).Debug("Failed to update CR status") d.planLogger.Err(err).Debug("Failed to update CR status")
return false, false, errors.WithStack(err) return false, false, errors.WithStack(err)
@ -273,7 +273,7 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl
if ok { if ok {
c.GetThrottles().Invalidate(components...) c.GetThrottles().Invalidate(components...)
d.planLogger.Info("Reloading cached status") d.planLogger.Debug("Reloading cached status")
if err := c.Refresh(ctx); err != nil { if err := c.Refresh(ctx); err != nil {
d.planLogger.Err(err).Warn("Unable to reload cached status") d.planLogger.Err(err).Warn("Unable to reload cached status")
return plan, recall, false, nil return plan, recall, false, nil
@ -338,7 +338,7 @@ func (d *Reconciler) executeOptionalAction(ctx context.Context, planAction api.A
func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, action Action) (done, abort, callAgain, retry bool, err error) { func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, action Action) (done, abort, callAgain, retry bool, err error) {
log := d.planLogger.Str("action", string(planAction.Type)).Str("member", planAction.MemberID) log := d.planLogger.Str("action", string(planAction.Type)).Str("member", planAction.MemberID)
log.Info("Executing action") log.Debug("Executing action")
if !planAction.IsStarted() { if !planAction.IsStarted() {
// Not started yet // Not started yet
@ -356,10 +356,10 @@ func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, a
} }
if ready { if ready {
log.Bool("ready", ready).Info("Action Start completed") log.Bool("ready", ready).Debug("Action Start completed")
return true, false, false, false, nil return true, false, false, false, nil
} }
log.Bool("ready", ready).Info("Action Started") log.Bool("ready", ready).Debug("Action Started")
return false, false, true, false, nil return false, false, true, false, nil
} }

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -22,6 +22,7 @@ package reconcile
import ( import (
"context" "context"
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -30,7 +31,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/logging"
) )
var reconcileLogger = logging.Global().RegisterAndGetLogger("deployment-reconcile", logging.Info) var reconcileLogger = logging.Global().RegisterAndGetLogger("deployment-reconcile", logging.Info, logging.WithSamplingPeriod(time.Second*10))
// Reconciler is the service that takes care of bring the a deployment // Reconciler is the service that takes care of bring the a deployment
// in line with its (changed) specification. // in line with its (changed) specification.

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -63,8 +63,8 @@ const (
) )
var ( var (
logger = logging.Global().RegisterAndGetLogger("inspector", logging.Info) logger = logging.Global().RegisterAndGetLogger("inspector", logging.Info, logging.WithSamplingPeriod(time.Second*10))
clientLogger = logging.Global().RegisterAndGetLogger("k8s-client", logging.Info) clientLogger = logging.Global().RegisterAndGetLogger("k8s-client", logging.Info, logging.WithSamplingPeriod(time.Second*10))
) )
func (i inspectorLoaders) Get(name string) int { func (i inspectorLoaders) Get(name string) int {

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -21,13 +21,15 @@
package resources package resources
import ( import (
"time"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/logging"
) )
var ( var (
logger = logging.Global().RegisterAndGetLogger("deployment-resources", logging.Info) logger = logging.Global().RegisterAndGetLogger("deployment-resources", logging.Info, logging.WithSamplingPeriod(time.Second*10))
) )
func (r *Resources) WrapLogger(in *zerolog.Event) *zerolog.Event { func (r *Resources) WrapLogger(in *zerolog.Event) *zerolog.Event {

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -20,7 +20,9 @@
package logging package logging
import "github.com/rs/zerolog" import (
"github.com/rs/zerolog"
)
type Level zerolog.Level type Level zerolog.Level
@ -40,3 +42,21 @@ const (
func (l Level) String() string { func (l Level) String() string {
return zerolog.Level(l).String() return zerolog.Level(l).String()
} }
func WithLevel(l *zerolog.Logger, level Level) *zerolog.Event {
switch level {
case Trace:
return l.Trace()
case Debug:
return l.Debug()
case Info:
return l.Info()
case Warn:
return l.Warn()
case Error:
return l.Error()
case Fatal:
return l.Fatal()
}
return nil
}

View file

@ -1,7 +1,7 @@
// //
// DISCLAIMER // DISCLAIMER
// //
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -32,15 +32,21 @@ import (
const TopicAll = "all" const TopicAll = "all"
type Config struct {
Levels map[string]Level
Sampling bool
}
type Factory interface { type Factory interface {
Get(name string) Logger Get(name string, opts ...func(*LoggerOptions)) Logger
LogLevels() map[string]Level LogLevels() map[string]Level
Configure(cfg Config)
ApplyLogLevels(in map[string]Level) ApplyLogLevels(in map[string]Level)
SetRoot(log zerolog.Logger) SetRoot(log zerolog.Logger)
RegisterLogger(name string, level Level) bool RegisterLogger(name string, level Level) bool
RegisterAndGetLogger(name string, level Level) Logger RegisterAndGetLogger(name string, level Level, opts ...func(*LoggerOptions)) Logger
RegisterWrappers(w ...Wrap) RegisterWrappers(w ...Wrap)
@ -71,6 +77,8 @@ type factory struct {
defaults map[string]Level defaults map[string]Level
levels map[string]Level levels map[string]Level
samplingEnabled bool
} }
func (f *factory) Names() []string { func (f *factory) Names() []string {
@ -94,9 +102,19 @@ func (f *factory) RegisterWrappers(w ...Wrap) {
f.wrappers = append(f.wrappers, w...) f.wrappers = append(f.wrappers, w...)
} }
func (f *factory) RegisterAndGetLogger(name string, level Level) Logger { type LoggerOptions struct {
SamplingPeriod time.Duration
}
func WithSamplingPeriod(p time.Duration) func(o *LoggerOptions) {
return func(o *LoggerOptions) {
o.SamplingPeriod = p
}
}
func (f *factory) RegisterAndGetLogger(name string, level Level, opts ...func(*LoggerOptions)) Logger {
f.RegisterLogger(name, level) f.RegisterLogger(name, level)
return f.Get(name) return f.Get(name, opts...)
} }
func (f *factory) SetRoot(log zerolog.Logger) { func (f *factory) SetRoot(log zerolog.Logger) {
@ -111,6 +129,11 @@ func (f *factory) SetRoot(log zerolog.Logger) {
} }
} }
func (f *factory) Configure(cfg Config) {
f.samplingEnabled = cfg.Sampling
f.ApplyLogLevels(cfg.Levels)
}
func (f *factory) ApplyLogLevels(in map[string]Level) { func (f *factory) ApplyLogLevels(in map[string]Level) {
f.lock.Lock() f.lock.Lock()
defer f.lock.Unlock() defer f.lock.Unlock()
@ -191,12 +214,22 @@ func (f *factory) getLogger(name string) *zerolog.Logger {
return nil return nil
} }
func (f *factory) Get(name string) Logger { func (f *factory) Get(name string, opts ...func(*LoggerOptions)) Logger {
o := LoggerOptions{}
for _, f := range opts {
f(&o)
}
var sampler Sampler
if f.samplingEnabled {
sampler = NewLogEventSampler(o.SamplingPeriod)
}
return &chain{ return &chain{
logger: &logger{ logger: &logger{
factory: f, factory: f,
name: name, name: name,
}, },
sampler: sampler,
} }
} }
@ -260,9 +293,9 @@ type logger struct {
type chain struct { type chain struct {
*logger *logger
parent *chain parent *chain
sampler Sampler
wrap Wrap wrap Wrap
} }
func (c *chain) TraceIO() LoggerIO { func (c *chain) TraceIO() LoggerIO {
@ -370,6 +403,25 @@ func (c *chain) Str(key, value string) Logger {
return c.Wrap(Str(key, value)) return c.Wrap(Str(key, value))
} }
func (c *chain) applyIfNeeded(level Level, msg string, args ...interface{}) {
l := c.factory.getLogger(c.name)
if l == nil {
return
}
if c.sampler != nil && !c.sampler.Sample(level, msg, args) {
// skip duplicate event
return
}
ev := WithLevel(l, level)
if ev == nil {
return
}
c.apply(ev).Msgf(msg, args...)
}
func (c *chain) apply(in *zerolog.Event) *zerolog.Event { func (c *chain) apply(in *zerolog.Event) *zerolog.Event {
if p := c.parent; c.parent != nil { if p := c.parent; c.parent != nil {
in = p.apply(in) in = p.apply(in)
@ -394,57 +446,27 @@ func (c *chain) apply(in *zerolog.Event) *zerolog.Event {
} }
func (c *chain) Trace(msg string, args ...interface{}) { func (c *chain) Trace(msg string, args ...interface{}) {
l := c.factory.getLogger(c.name) c.applyIfNeeded(Trace, msg, args...)
if l == nil {
return
}
c.apply(l.Trace()).Msgf(msg, args...)
} }
func (c *chain) Debug(msg string, args ...interface{}) { func (c *chain) Debug(msg string, args ...interface{}) {
l := c.factory.getLogger(c.name) c.applyIfNeeded(Debug, msg, args...)
if l == nil {
return
}
c.apply(l.Debug()).Msgf(msg, args...)
} }
func (c *chain) Info(msg string, args ...interface{}) { func (c *chain) Info(msg string, args ...interface{}) {
l := c.factory.getLogger(c.name) c.applyIfNeeded(Info, msg, args...)
if l == nil {
return
}
c.apply(l.Info()).Msgf(msg, args...)
} }
func (c *chain) Warn(msg string, args ...interface{}) { func (c *chain) Warn(msg string, args ...interface{}) {
l := c.factory.getLogger(c.name) c.applyIfNeeded(Warn, msg, args...)
if l == nil {
return
}
c.apply(l.Warn()).Msgf(msg, args...)
} }
func (c *chain) Error(msg string, args ...interface{}) { func (c *chain) Error(msg string, args ...interface{}) {
l := c.factory.getLogger(c.name) c.applyIfNeeded(Error, msg, args...)
if l == nil {
return
}
c.apply(l.Error()).Msgf(msg, args...)
} }
func (c *chain) Fatal(msg string, args ...interface{}) { func (c *chain) Fatal(msg string, args ...interface{}) {
l := c.factory.getLogger(c.name) c.applyIfNeeded(Fatal, msg, args...)
if l == nil {
return
}
c.apply(l.Fatal()).Msgf(msg, args...)
} }
func (c *chain) Wrap(w Wrap) Logger { func (c *chain) Wrap(w Wrap) Logger {

104
pkg/logging/sampler.go Normal file
View file

@ -0,0 +1,104 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package logging
import (
"fmt"
goSync "sync"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/sync"
)
// Sampler defines an interface to a log sampler.
type Sampler interface {
// Sample returns true if the event should be part of the sample, false if
// the event should be dropped.
Sample(level Level, msg string, args ...any) bool
}
func NewLogEventSampler(period time.Duration) Sampler {
return &logEventSampler{
period: period,
}
}
type logEventSampler struct {
// period defines the burst period
period time.Duration
// lruCache is the thread-safe map of message hash -> timestamp when event happened last time
lruCache sync.Map[string, int64]
gcLock goSync.Mutex
gcCycle uint64
}
func (s *logEventSampler) Sample(level Level, format string, args ...any) bool {
if s.period <= 0 {
// sampling disabled
return true
}
s.gc()
hash := s.hash(level, format, args...)
return s.store(hash)
}
func (s *logEventSampler) hash(level Level, format string, args ...any) string {
msg := fmt.Sprintf(format, args...)
return fmt.Sprintf("%s_%s", level.String(), msg)
}
// store returns true if event hash was not stored, or it was stored more than s.period ago
func (s *logEventSampler) store(hash string) bool {
storedAt, _ := s.lruCache.LoadOrStore(hash, 0)
now := time.Now().UnixNano()
if now > storedAt {
newStoredAt := now + s.period.Nanoseconds()
return s.lruCache.CompareAndSwap(hash, storedAt, newStoredAt)
}
return false
}
func (s *logEventSampler) gc() {
s.gcLock.Lock()
defer s.gcLock.Unlock()
s.gcCycle++
if s.gcCycle > 10e5 {
// run cache cleanup every 10e5 cycles
s.gcClean()
s.gcCycle = 0
}
}
func (s *logEventSampler) gcClean() {
now := time.Now().UnixNano()
s.lruCache.Range(func(key string, value int64) bool {
gcPeriod := (time.Minute * 15).Nanoseconds()
if now-value > gcPeriod {
s.lruCache.Delete(key)
}
return true
})
}

105
pkg/logging/sampler_test.go Normal file
View file

@ -0,0 +1,105 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package logging
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestLogEventSampler(t *testing.T) {
const SamplePeriod = time.Millisecond * 500
s := NewLogEventSampler(SamplePeriod)
require.True(t, s.Sample(Debug, "new-msg"))
events := []struct {
l Level
msg string
}{
{Debug, "test"},
{Info, "test"},
{Debug, "test-debug"},
}
trigger := func(t *testing.T, invocation int) {
for _, ev := range events {
sampled := s.Sample(ev.l, ev.msg)
if invocation == 0 {
require.True(t, sampled, invocation)
} else {
require.False(t, sampled, invocation)
}
time.Sleep(time.Millisecond)
}
}
t.Run("sequentially", func(t *testing.T) {
for i := 0; i < 50; i++ {
trigger(t, i)
}
time.Sleep(time.Second)
for i := 0; i < 50; i++ {
trigger(t, i)
}
time.Sleep(time.Second)
})
t.Run("parallel", func(t *testing.T) {
var wg sync.WaitGroup
// event index -> how many samples
results := map[int]*atomic.Int32{
0: {},
1: {},
2: {},
}
expectedSamples := int32(2)
iters := int32(SamplePeriod/time.Millisecond)*(expectedSamples-1) + 100
const sleep = time.Millisecond
for i, ev := range events {
wg.Add(1)
go func(i int, l Level, msg string) {
defer wg.Done()
for j := int32(0); j < iters; j++ {
sampled := s.Sample(l, msg)
if sampled {
results[i].Add(1)
}
time.Sleep(sleep)
}
}(i, ev.l, ev.msg)
}
wg.Wait()
require.Equal(t, expectedSamples, results[0].Load())
require.Equal(t, expectedSamples, results[1].Load())
require.Equal(t, expectedSamples, results[2].Load())
})
}

69
pkg/util/sync/map.go Normal file
View file

@ -0,0 +1,69 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package sync
import (
"sync"
)
// Map is generic wrapper for sync.Map
type Map[K comparable, V any] struct {
m sync.Map
}
func (m *Map[K, V]) Delete(key K) {
m.m.Delete(key)
}
func (m *Map[K, V]) Load(key K) (value V, ok bool) {
v, ok := m.m.Load(key)
if !ok {
return value, ok
}
return v.(V), ok
}
func (m *Map[K, V]) CompareAndSwap(key K, old, new V) bool {
return m.m.CompareAndSwap(key, old, new)
}
func (m *Map[K, V]) LoadAndDelete(key K) (value V, loaded bool) {
v, loaded := m.m.LoadAndDelete(key)
if !loaded {
return value, loaded
}
return v.(V), loaded
}
func (m *Map[K, V]) LoadOrStore(key K, value V) (actual V, loaded bool) {
a, loaded := m.m.LoadOrStore(key, value)
return a.(V), loaded
}
func (m *Map[K, V]) Range(f func(key K, value V) bool) {
m.m.Range(func(key, value any) bool {
return f(key.(K), value.(V))
})
}
func (m *Map[K, V]) Store(key K, value V) {
m.m.Store(key, value)
}