From ecb3482f343d3148f650baba46fd11a9b14e8ec5 Mon Sep 17 00:00:00 2001 From: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> Date: Sat, 2 Sep 2023 02:17:01 +0200 Subject: [PATCH] [Feature] Reconciliation Loop Interval option (#1395) --- CHANGELOG.md | 1 + cmd/cmd.go | 4 ++ pkg/deployment/deployment.go | 5 ++ pkg/deployment/deployment_inspector.go | 7 +++ pkg/operator/operator.go | 1 + pkg/operator/operator_deployment.go | 1 + pkg/util/k8sutil/pods.go | 2 +- pkg/util/timer/delayer.go | 77 +++++++++++++++++++++++++ pkg/util/timer/delayer_test.go | 78 ++++++++++++++++++++++++++ 9 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 pkg/util/timer/delayer.go create mode 100644 pkg/util/timer/delayer_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 6e6783ecc..d95649daa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - (Bugfix) Fix CRD yaml (chart) - (Bugfix) (EE) Fix MemberMaintenance Context and ClusterMaintenance discovery - (Feature) Add proper Prometheus endpoint compression + 204 response code +- (Feature) Reconciliation Loop Interval option ## [1.2.32](https://github.com/arangodb/kube-arangodb/tree/1.2.32) (2023-08-07) - (Feature) Backup lifetime - remove Backup once its lifetime has been reached diff --git a/cmd/cmd.go b/cmd/cmd.go index 48a24309e..b13de44b4 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -136,6 +136,8 @@ var ( alpineImage, metricsExporterImage, arangoImage string + reconciliationDelay time.Duration + singleMode bool scope string } @@ -224,6 +226,7 @@ func init() { f.DurationVar(&shutdownOptions.delay, "shutdown.delay", defaultShutdownDelay, "The delay before running shutdown handlers") f.DurationVar(&shutdownOptions.timeout, "shutdown.timeout", defaultShutdownTimeout, "Timeout for shutdown handlers") f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration") + f.DurationVar(&operatorOptions.reconciliationDelay, "reconciliation.delay", 0, "Delay between reconciliation loops (<= 0 -> Disabled)") f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read") f.Float32Var(&operatorKubernetesOptions.qps, "kubernetes.qps", kclient.DefaultQPS, "Number of queries per second for k8s API") f.IntVar(&operatorKubernetesOptions.burst, "kubernetes.burst", kclient.DefaultBurst, "Burst for the k8s API") @@ -530,6 +533,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper ArangoImage: operatorOptions.arangoImage, SingleMode: operatorOptions.singleMode, Scope: scope, + ReconciliationDelay: operatorOptions.reconciliationDelay, ShutdownDelay: shutdownOptions.delay, ShutdownTimeout: shutdownOptions.timeout, } diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index be50cb3eb..2f30edf77 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -60,6 +60,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/util/timer" "github.com/arangodb/kube-arangodb/pkg/util/trigger" ) @@ -70,6 +71,7 @@ type Config struct { ScalingIntegrationEnabled bool OperatorImage string ArangoImage string + ReconciliationDelay time.Duration Scope scope.Scope } @@ -107,6 +109,8 @@ type Deployment struct { uid types.UID namespace string + delayer timer.Delayer + currentObject *api.ArangoDeployment currentObjectStatus *api.DeploymentStatus currentObjectLock sync.RWMutex @@ -256,6 +260,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De stopCh: make(chan struct{}), agencyCache: agency.NewCache(apiObject.GetNamespace(), apiObject.GetName(), apiObject.GetAcceptedSpec().Mode), acs: acs.NewACS(apiObject.GetUID(), i), + delayer: timer.NewDelayer(), } d.log = logger.WrapObj(d) diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go index cb574f38d..cf2c4119c 100644 --- a/pkg/deployment/deployment_inspector.go +++ b/pkg/deployment/deployment_inspector.go @@ -24,6 +24,8 @@ import ( "context" "time" + "github.com/rs/zerolog/log" + "github.com/arangodb/kube-arangodb/pkg/apis/deployment" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/features" @@ -51,6 +53,11 @@ var ( func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval { start := time.Now() + if delay := d.delayer.Wait(); delay > 0 { + log.Info().Dur("delay", delay).Msgf("Reconciliation loop execution was delayed") + } + defer d.delayer.Delay(d.config.ReconciliationDelay) + ctxReconciliation, cancelReconciliation := globals.GetGlobalTimeouts().Reconciliation().WithTimeout(context.Background()) defer cancelReconciliation() defer func() { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 1523e3e1b..308682e7a 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -106,6 +106,7 @@ type Config struct { ScalingIntegrationEnabled bool SingleMode bool Scope scope.Scope + ReconciliationDelay time.Duration ShutdownDelay time.Duration ShutdownTimeout time.Duration } diff --git a/pkg/operator/operator_deployment.go b/pkg/operator/operator_deployment.go index 793184179..d302be88b 100644 --- a/pkg/operator/operator_deployment.go +++ b/pkg/operator/operator_deployment.go @@ -201,6 +201,7 @@ func (o *Operator) makeDeploymentConfigAndDeps() (deployment.Config, deployment. ArangoImage: o.ArangoImage, AllowChaos: o.Config.AllowChaos, ScalingIntegrationEnabled: o.Config.ScalingIntegrationEnabled, + ReconciliationDelay: o.Config.ReconciliationDelay, Scope: o.Scope, } deps := deployment.Dependencies{ diff --git a/pkg/util/k8sutil/pods.go b/pkg/util/k8sutil/pods.go index 4721fd5b2..092ea8546 100644 --- a/pkg/util/k8sutil/pods.go +++ b/pkg/util/k8sutil/pods.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/pkg/util/timer/delayer.go b/pkg/util/timer/delayer.go new file mode 100644 index 000000000..01579389a --- /dev/null +++ b/pkg/util/timer/delayer.go @@ -0,0 +1,77 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package timer + +import ( + "sync" + "time" +) + +func NewDelayer() Delayer { + return &delayer{} +} + +type Delayer interface { + Delay(delay time.Duration) + + Wait() time.Duration + + Copy() Delayer +} + +type delayer struct { + lock sync.Mutex + + last, next time.Time +} + +func (d *delayer) Copy() Delayer { + d.lock.Lock() + defer d.lock.Unlock() + + return &delayer{ + last: d.last, + next: d.next, + } +} + +func (d *delayer) Wait() time.Duration { + d.lock.Lock() + defer d.lock.Unlock() + + since := time.Until(d.next) + + if since <= time.Millisecond { + return 0 + } + + time.Sleep(since) + + return since +} + +func (d *delayer) Delay(delay time.Duration) { + d.lock.Lock() + defer d.lock.Unlock() + + d.last = time.Now() + d.next = d.last.Add(delay) +} diff --git a/pkg/util/timer/delayer_test.go b/pkg/util/timer/delayer_test.go new file mode 100644 index 000000000..c055c7526 --- /dev/null +++ b/pkg/util/timer/delayer_test.go @@ -0,0 +1,78 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package timer + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func withTime(f func()) time.Duration { + now := time.Now() + f() + return time.Since(now) +} + +func Test_Delayer(t *testing.T) { + d := NewDelayer() + + t.Run("Ensure instant execution", func(t *testing.T) { + require.True(t, withTime(func() { + d.Wait() + }) < time.Millisecond) + + require.True(t, withTime(func() { + d.Wait() + }) < time.Millisecond) + }) + + t.Run("Delay execution", func(t *testing.T) { + require.True(t, withTime(func() { + d.Delay(50 * time.Millisecond) + d.Wait() + }) >= 50*time.Millisecond) + }) + + t.Run("Delay execution, but allow multiple ones", func(t *testing.T) { + require.True(t, withTime(func() { + d.Delay(50 * time.Millisecond) + d.Wait() + d.Wait() + d.Wait() + d.Wait() + }) >= 50*time.Millisecond) + }) + + t.Run("Delay execution multiple times", func(t *testing.T) { + require.True(t, withTime(func() { + d.Delay(50 * time.Millisecond) + d.Wait() + d.Delay(50 * time.Millisecond) + d.Wait() + d.Delay(50 * time.Millisecond) + d.Wait() + d.Delay(50 * time.Millisecond) + d.Wait() + }) >= 200*time.Millisecond) + }) +}