mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Feature] [GT-432] Agency Poll System (#1332)
This commit is contained in:
parent
d9ea028019
commit
3ec9e764c6
10 changed files with 74 additions and 24 deletions
|
@ -5,6 +5,7 @@
|
||||||
- (Feature) Agency Cache Poll EE Extension
|
- (Feature) Agency Cache Poll EE Extension
|
||||||
- (Feature) Metrics Counter
|
- (Feature) Metrics Counter
|
||||||
- (Feature) Requests Bytes Counter
|
- (Feature) Requests Bytes Counter
|
||||||
|
- (Feature) Agency Poll System
|
||||||
|
|
||||||
## [1.2.29](https://github.com/arangodb/kube-arangodb/tree/1.2.29) (2023-06-08)
|
## [1.2.29](https://github.com/arangodb/kube-arangodb/tree/1.2.29) (2023-06-08)
|
||||||
- (Maintenance) Add govulncheck to pipeline, update golangci-linter
|
- (Maintenance) Add govulncheck to pipeline, update golangci-linter
|
||||||
|
|
|
@ -28,6 +28,7 @@ import (
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
|
|
||||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||||
|
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
|
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
|
||||||
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
|
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
|
||||||
"github.com/arangodb/kube-arangodb/pkg/logging"
|
"github.com/arangodb/kube-arangodb/pkg/logging"
|
||||||
|
@ -211,7 +212,7 @@ type cache struct {
|
||||||
|
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
|
|
||||||
loader StateLoader[state.Root]
|
loader agencyCache.StateLoader[state.Root]
|
||||||
|
|
||||||
health Health
|
health Health
|
||||||
|
|
||||||
|
|
43
pkg/deployment/agency/cache/interfaces.go
vendored
Normal file
43
pkg/deployment/agency/cache/interfaces.go
vendored
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
//
|
||||||
|
// DISCLAIMER
|
||||||
|
//
|
||||||
|
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
//
|
||||||
|
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||||
|
//
|
||||||
|
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
|
||||||
|
)
|
||||||
|
|
||||||
|
type LeaderDiscovery interface {
|
||||||
|
Discover(ctx context.Context) (conn.Connection, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type StateLoader[T interface{}] interface {
|
||||||
|
State() (*T, uint64, bool)
|
||||||
|
|
||||||
|
Invalidate()
|
||||||
|
Valid() bool
|
||||||
|
|
||||||
|
UpdateTime() time.Time
|
||||||
|
|
||||||
|
Refresh(ctx context.Context, discovery LeaderDiscovery) error
|
||||||
|
}
|
|
@ -23,14 +23,11 @@ package agency
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
|
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LeaderDiscovery interface {
|
func StaticLeaderDiscovery(in conn.Connection) agencyCache.LeaderDiscovery {
|
||||||
Discover(ctx context.Context) (conn.Connection, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func StaticLeaderDiscovery(in conn.Connection) LeaderDiscovery {
|
|
||||||
return staticLeaderDiscovery{conn: in}
|
return staticLeaderDiscovery{conn: in}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,8 @@
|
||||||
|
|
||||||
package agency
|
package agency
|
||||||
|
|
||||||
func getLoaderBase[T interface{}]() StateLoader[T] {
|
import agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
|
|
||||||
|
func getLoaderBase[T interface{}]() agencyCache.StateLoader[T] {
|
||||||
return NewSimpleStateLoader[T]()
|
return NewSimpleStateLoader[T]()
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,16 +25,16 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
agencyCecheConfig "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getLoader[T interface{}]() StateLoader[T] {
|
func getLoader[T interface{}]() agencyCache.StateLoader[T] {
|
||||||
loader := getLoaderBase[T]()
|
loader := getLoaderBase[T]()
|
||||||
|
|
||||||
loader = InvalidateOnErrorLoader[T](loader)
|
loader = InvalidateOnErrorLoader[T](loader)
|
||||||
|
|
||||||
loader = DelayLoader[T](loader, agencyCecheConfig.GlobalConfig().RefreshDelay)
|
loader = DelayLoader[T](loader, agencyCache.GlobalConfig().RefreshDelay)
|
||||||
loader = RefreshLoader[T](loader, agencyCecheConfig.GlobalConfig().RefreshInterval)
|
loader = RefreshLoader[T](loader, agencyCache.GlobalConfig().RefreshInterval)
|
||||||
|
|
||||||
return loader
|
return loader
|
||||||
}
|
}
|
||||||
|
@ -47,10 +47,10 @@ type StateLoader[T interface{}] interface {
|
||||||
|
|
||||||
UpdateTime() time.Time
|
UpdateTime() time.Time
|
||||||
|
|
||||||
Refresh(ctx context.Context, discovery LeaderDiscovery) error
|
Refresh(ctx context.Context, discovery agencyCache.LeaderDiscovery) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSimpleStateLoader[T interface{}]() StateLoader[T] {
|
func NewSimpleStateLoader[T interface{}]() agencyCache.StateLoader[T] {
|
||||||
return &simpleStateLoader[T]{}
|
return &simpleStateLoader[T]{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,7 +96,7 @@ func (s *simpleStateLoader[T]) Invalidate() {
|
||||||
s.valid = false
|
s.valid = false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *simpleStateLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
|
func (s *simpleStateLoader[T]) Refresh(ctx context.Context, discovery agencyCache.LeaderDiscovery) error {
|
||||||
s.lock.Lock()
|
s.lock.Lock()
|
||||||
defer s.lock.Unlock()
|
defer s.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func DelayLoader[T interface{}](loader StateLoader[T], delay time.Duration) StateLoader[T] {
|
func DelayLoader[T interface{}](loader agencyCache.StateLoader[T], delay time.Duration) agencyCache.StateLoader[T] {
|
||||||
if delay <= 0 {
|
if delay <= 0 {
|
||||||
return loader
|
return loader
|
||||||
}
|
}
|
||||||
|
@ -43,7 +45,7 @@ type delayerLoader[T interface{}] struct {
|
||||||
last time.Time
|
last time.Time
|
||||||
delay time.Duration
|
delay time.Duration
|
||||||
|
|
||||||
parent StateLoader[T]
|
parent agencyCache.StateLoader[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *delayerLoader[T]) UpdateTime() time.Time {
|
func (i *delayerLoader[T]) UpdateTime() time.Time {
|
||||||
|
@ -74,7 +76,7 @@ func (i *delayerLoader[T]) Invalidate() {
|
||||||
i.parent.Invalidate()
|
i.parent.Invalidate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *delayerLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
|
func (i *delayerLoader[T]) Refresh(ctx context.Context, discovery agencyCache.LeaderDiscovery) error {
|
||||||
i.lock.Lock()
|
i.lock.Lock()
|
||||||
defer i.lock.Unlock()
|
defer i.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func InvalidateOnErrorLoader[T interface{}](loader StateLoader[T]) StateLoader[T] {
|
func InvalidateOnErrorLoader[T interface{}](loader agencyCache.StateLoader[T]) agencyCache.StateLoader[T] {
|
||||||
return &invalidateOnErrorLoader[T]{
|
return &invalidateOnErrorLoader[T]{
|
||||||
parent: loader,
|
parent: loader,
|
||||||
}
|
}
|
||||||
|
@ -35,7 +37,7 @@ func InvalidateOnErrorLoader[T interface{}](loader StateLoader[T]) StateLoader[T
|
||||||
type invalidateOnErrorLoader[T interface{}] struct {
|
type invalidateOnErrorLoader[T interface{}] struct {
|
||||||
lock sync.Mutex
|
lock sync.Mutex
|
||||||
|
|
||||||
parent StateLoader[T]
|
parent agencyCache.StateLoader[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *invalidateOnErrorLoader[T]) UpdateTime() time.Time {
|
func (i *invalidateOnErrorLoader[T]) UpdateTime() time.Time {
|
||||||
|
@ -66,7 +68,7 @@ func (i *invalidateOnErrorLoader[T]) Invalidate() {
|
||||||
i.parent.Invalidate()
|
i.parent.Invalidate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *invalidateOnErrorLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) (err error) {
|
func (i *invalidateOnErrorLoader[T]) Refresh(ctx context.Context, discovery agencyCache.LeaderDiscovery) (err error) {
|
||||||
i.lock.Lock()
|
i.lock.Lock()
|
||||||
defer i.lock.Unlock()
|
defer i.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,11 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency/cache"
|
||||||
)
|
)
|
||||||
|
|
||||||
func RefreshLoader[T interface{}](loader StateLoader[T], delay time.Duration) StateLoader[T] {
|
func RefreshLoader[T interface{}](loader agencyCache.StateLoader[T], delay time.Duration) agencyCache.StateLoader[T] {
|
||||||
if delay <= 0 {
|
if delay <= 0 {
|
||||||
return loader
|
return loader
|
||||||
}
|
}
|
||||||
|
@ -43,7 +45,7 @@ type refresherLoader[T interface{}] struct {
|
||||||
last time.Time
|
last time.Time
|
||||||
delay time.Duration
|
delay time.Duration
|
||||||
|
|
||||||
parent StateLoader[T]
|
parent agencyCache.StateLoader[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *refresherLoader[T]) UpdateTime() time.Time {
|
func (i *refresherLoader[T]) UpdateTime() time.Time {
|
||||||
|
@ -74,7 +76,7 @@ func (i *refresherLoader[T]) Invalidate() {
|
||||||
i.parent.Invalidate()
|
i.parent.Invalidate()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (i *refresherLoader[T]) Refresh(ctx context.Context, discovery LeaderDiscovery) error {
|
func (i *refresherLoader[T]) Refresh(ctx context.Context, discovery agencyCache.LeaderDiscovery) error {
|
||||||
i.lock.Lock()
|
i.lock.Lock()
|
||||||
defer i.lock.Unlock()
|
defer i.lock.Unlock()
|
||||||
|
|
||||||
|
|
|
@ -30,7 +30,7 @@ var agencyPoll = &feature{
|
||||||
version: "3.5.0",
|
version: "3.5.0",
|
||||||
enterpriseRequired: false,
|
enterpriseRequired: false,
|
||||||
operatorEnterpriseRequired: true,
|
operatorEnterpriseRequired: true,
|
||||||
enabledByDefault: false,
|
enabledByDefault: true,
|
||||||
}
|
}
|
||||||
|
|
||||||
func AgencyPoll() Feature {
|
func AgencyPoll() Feature {
|
||||||
|
|
Loading…
Reference in a new issue