From b6efac04c21ce71b54455b9c9e606530ccb97235 Mon Sep 17 00:00:00 2001 From: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> Date: Tue, 27 Jun 2023 21:59:35 +0200 Subject: [PATCH] [Feature] Agency Improvements (#1341) --- CHANGELOG.md | 1 + pkg/deployment/agency/cache/config.go | 2 + pkg/deployment/agency/loader.go | 2 + pkg/deployment/agency/loader_retry.go | 92 +++++++++++++++++++++++++++ 4 files changed, 97 insertions(+) create mode 100644 pkg/deployment/agency/loader_retry.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a475b57c..72d449378 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A) - (Improvement) Block traffic on the services if there is more than 1 active leader in ActiveFailover mode - (Improvement) Improve master endpoint validation. +- (Feature) Agency Improvements ## [1.2.30](https://github.com/arangodb/kube-arangodb/tree/1.2.30) (2023-06-16) - (Feature) AgencyCache Interface diff --git a/pkg/deployment/agency/cache/config.go b/pkg/deployment/agency/cache/config.go index 264fb2550..44a91a03b 100644 --- a/pkg/deployment/agency/cache/config.go +++ b/pkg/deployment/agency/cache/config.go @@ -40,6 +40,7 @@ func Init(cmd *cobra.Command) error { f.DurationVar(&global.RefreshDelay, "agency.refresh-delay", 500*time.Millisecond, "The Agency refresh delay (0 = no delay)") f.DurationVar(&global.RefreshInterval, "agency.refresh-interval", 0, "The Agency refresh interval (0 = do not refresh)") + f.IntVar(&global.Retries, "agency.retries", 1, "The Agency retries (0 = no retries)") return nil } @@ -53,4 +54,5 @@ func GlobalConfig() Config { type Config struct { RefreshDelay time.Duration RefreshInterval time.Duration + Retries int } diff --git a/pkg/deployment/agency/loader.go b/pkg/deployment/agency/loader.go index a3628fc47..b9f3c61af 100644 --- a/pkg/deployment/agency/loader.go +++ b/pkg/deployment/agency/loader.go @@ -36,6 +36,8 @@ func getLoader[T interface{}]() agencyCache.StateLoader[T] { loader = DelayLoader[T](loader, agencyCache.GlobalConfig().RefreshDelay) loader = RefreshLoader[T](loader, agencyCache.GlobalConfig().RefreshInterval) + loader = RetryLoader[T](loader, agencyCache.GlobalConfig().Retries) + return loader } diff --git a/pkg/deployment/agency/loader_retry.go b/pkg/deployment/agency/loader_retry.go new file mode 100644 index 000000000..2b81142a3 --- /dev/null +++ b/pkg/deployment/agency/loader_retry.go @@ -0,0 +1,92 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package agency + +import ( + "context" + "sync" + "time" + + agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache" +) + +func RetryLoader[T interface{}](loader agencyCache.StateLoader[T], retries int) agencyCache.StateLoader[T] { + if retries <= 0 { + return loader + } + + return &retryLoader[T]{ + parent: loader, + retries: retries, + } +} + +type retryLoader[T interface{}] struct { + lock sync.Mutex + + retries int + + parent agencyCache.StateLoader[T] +} + +func (i *retryLoader[T]) UpdateTime() time.Time { + i.lock.Lock() + defer i.lock.Unlock() + + return i.parent.UpdateTime() +} + +func (i *retryLoader[T]) Valid() bool { + i.lock.Lock() + defer i.lock.Unlock() + + return i.parent.Valid() +} + +func (i *retryLoader[T]) State() (*T, uint64, bool) { + i.lock.Lock() + defer i.lock.Unlock() + + return i.parent.State() +} + +func (i *retryLoader[T]) Invalidate() { + i.lock.Lock() + defer i.lock.Unlock() + + i.parent.Invalidate() +} + +func (i *retryLoader[T]) Refresh(ctx context.Context, discovery agencyCache.LeaderDiscovery) (err error) { + i.lock.Lock() + defer i.lock.Unlock() + + for z := 0; z < i.retries-1; z++ { + if err := i.parent.Refresh(ctx, discovery); err != nil { + logger.Err(err).Debug("Unable to refresh agency while retrying") + continue + } + + return nil + } + + return i.parent.Refresh(ctx, discovery) +}