1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Advanced InSync discovery (#925)

This commit is contained in:
Adam Janikowski 2022-03-19 00:49:20 +01:00 committed by GitHub
parent 31fa8fbda0
commit 53871beaf4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
33 changed files with 28349 additions and 348 deletions

View file

@ -7,6 +7,7 @@
- (Bugfix) Assign imagePullSecrets to LocalStorage
- (Update) Bump K8S API to 1.21.10
- (Feature) (ACS) Add ACS handler
- (Feature) Allow to restart DBServers in cases when WriteConcern will be satisfied
## [1.2.8](https://github.com/arangodb/kube-arangodb/tree/1.2.8) (2022-02-24)
- Do not check License V2 on Community images

View file

@ -27,5 +27,5 @@ type StateCurrentDBCollections map[string]StateCurrentDBCollection
type StateCurrentDBCollection map[string]StateCurrentDBShard
type StateCurrentDBShard struct {
Servers []string `json:"servers,omitempty"`
Servers ShardServers `json:"servers,omitempty"`
}

View file

@ -0,0 +1,64 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
type CollectionGeneratorInterface interface {
WithWriteConcern(wc int) CollectionGeneratorInterface
WithReplicationFactor(rf int) CollectionGeneratorInterface
WithShard() ShardGeneratorInterface
Add() DatabaseGeneratorInterface
}
type collectionGenerator struct {
db databaseGenerator
col string
wc *int
rf *int
shards map[int]shardGenerator
}
func (c collectionGenerator) Add() DatabaseGeneratorInterface {
d := c.db
if d.collections == nil {
d.collections = map[string]collectionGenerator{}
}
d.collections[c.col] = c
return d
}
func (c collectionGenerator) WithShard() ShardGeneratorInterface {
return shardGenerator{
col: c,
}
}
func (c collectionGenerator) WithWriteConcern(wc int) CollectionGeneratorInterface {
c.wc = &wc
return c
}
func (c collectionGenerator) WithReplicationFactor(rf int) CollectionGeneratorInterface {
c.rf = &rf
return c
}

View file

@ -0,0 +1,110 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
import (
"fmt"
"strings"
"testing"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/dchest/uniuri"
"github.com/stretchr/testify/require"
)
func NewDatabaseRandomGenerator() DatabaseGeneratorInterface {
return NewDatabaseGenerator(fmt.Sprintf("d%s", strings.ToLower(uniuri.NewLen(16))))
}
func NewDatabaseGenerator(name string) DatabaseGeneratorInterface {
return databaseGenerator{
db: name,
}
}
type DatabaseGeneratorInterface interface {
Collection(name string) CollectionGeneratorInterface
RandomCollection() CollectionGeneratorInterface
Add() StateGenerator
}
type databaseGenerator struct {
db string
collections map[string]collectionGenerator
}
func (d databaseGenerator) RandomCollection() CollectionGeneratorInterface {
return d.Collection(fmt.Sprintf("c%s", strings.ToLower(uniuri.NewLen(16))))
}
func (d databaseGenerator) Collection(name string) CollectionGeneratorInterface {
return collectionGenerator{
db: d,
col: name,
}
}
func (d databaseGenerator) Add() StateGenerator {
return func(t *testing.T, s *State) {
if s.Plan.Collections == nil {
s.Plan.Collections = StatePlanCollections{}
}
if s.Current.Collections == nil {
s.Current.Collections = StateCurrentCollections{}
}
_, ok := s.Plan.Collections[d.db]
require.False(t, ok)
_, ok = s.Current.Collections[d.db]
require.False(t, ok)
plan := StatePlanDBCollections{}
current := StateCurrentDBCollections{}
for col, colDet := range d.collections {
planShards := Shards{}
currentShards := StateCurrentDBCollection{}
for shard, shardDet := range colDet.shards {
n := fmt.Sprintf("s%d", shard)
planShards[n] = shardDet.plan
currentShards[n] = StateCurrentDBShard{Servers: shardDet.current}
}
planCol := StatePlanCollection{
Name: util.NewString(col),
Shards: planShards,
WriteConcern: colDet.wc,
ReplicationFactor: colDet.rf,
}
plan[col] = planCol
current[col] = currentShards
}
s.Plan.Collections[d.db] = plan
s.Current.Collections[d.db] = current
}
}

View file

@ -0,0 +1,56 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
type ShardGeneratorInterface interface {
WithPlan(servers ...string) ShardGeneratorInterface
WithCurrent(servers ...string) ShardGeneratorInterface
Add() CollectionGeneratorInterface
}
type shardGenerator struct {
col collectionGenerator
plan []string
current []string
}
func (s shardGenerator) WithPlan(servers ...string) ShardGeneratorInterface {
s.plan = servers
return s
}
func (s shardGenerator) WithCurrent(servers ...string) ShardGeneratorInterface {
s.current = servers
return s
}
func (s shardGenerator) Add() CollectionGeneratorInterface {
c := s.col
if c.shards == nil {
c.shards = map[int]shardGenerator{}
}
c.shards[id()] = s
return c
}

View file

@ -0,0 +1,52 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
import (
"sync"
"testing"
)
var (
currentID int
idLock sync.Mutex
)
func id() int {
idLock.Lock()
defer idLock.Unlock()
z := currentID
currentID++
return z
}
type StateGenerator func(t *testing.T, s *State)
func GenerateState(t *testing.T, generators ...StateGenerator) State {
var s State
for _, g := range generators {
g(t, &s)
}
return s
}

View file

@ -42,9 +42,62 @@ func (a StatePlanDBCollections) IsDBServerInCollections(name string) bool {
return false
}
func (a StatePlanDBCollections) CountShards() int {
count := 0
for _, d := range a {
count += len(d.Shards)
}
return count
}
type StatePlanCollection struct {
Name *string `json:"name"`
Shards StatePlanShard `json:"shards"`
Name *string `json:"name"`
Shards Shards `json:"shards"`
// deprecated
// MinReplicationFactor is deprecated, but we have to support it for backward compatibility
MinReplicationFactor *int `json:"minReplicationFactor,omitempty"`
WriteConcern *int `json:"writeConcern,omitempty"`
ReplicationFactor *int `json:"replicationFactor,omitempty"`
}
func (a *StatePlanCollection) GetReplicationFactor(shard string) int {
if a == nil {
return 0
}
l := len(a.Shards[shard])
if z := a.ReplicationFactor; z == nil {
return l
} else {
if v := *z; v > l {
return v
} else {
return l
}
}
}
func (a *StatePlanCollection) GetWriteConcern(def int) int {
if p := a.GetWriteConcernP(); p != nil {
return *p
}
return def
}
func (a *StatePlanCollection) GetWriteConcernP() *int {
if a == nil {
return nil
}
if a.WriteConcern == nil {
return a.MinReplicationFactor
}
return a.WriteConcern
}
func (a StatePlanCollection) GetName(d string) string {
@ -55,15 +108,15 @@ func (a StatePlanCollection) GetName(d string) string {
return *a.Name
}
func (a StatePlanCollection) IsDBServerInShards(name string) bool {
for _, dbservers := range a.Shards {
for _, dbserver := range dbservers {
if dbserver == name {
return true
}
func (a *StatePlanCollection) IsDBServerInShards(name string) bool {
if a == nil {
return false
}
for _, planShards := range a.Shards {
if planShards.Contains(name) {
return true
}
}
return false
}
type StatePlanShard map[string][]string

View file

@ -0,0 +1,47 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
type Shards map[string]ShardServers
type ShardServers []string
func (s ShardServers) Contains(server string) bool {
for _, q := range s {
if server == q {
return true
}
}
return false
}
func (s ShardServers) FilterBy(b ShardServers) ShardServers {
q := make(ShardServers, 0, len(s))
for _, i := range s {
if b.Contains(i) {
q = append(q, i)
}
}
return q
}

View file

@ -83,6 +83,10 @@ type StateRoot struct {
Arango State `json:"arango"`
}
type DumpState struct {
Agency StateRoot `json:"agency"`
}
type State struct {
Supervision StateSupervision `json:"Supervision"`
Plan StatePlan `json:"Plan"`
@ -115,3 +119,152 @@ func (d *StateExists) UnmarshalJSON(bytes []byte) error {
*d = bytes != nil
return nil
}
func (s State) CountShards() int {
count := 0
for _, collections := range s.Plan.Collections {
count += collections.CountShards()
}
return count
}
func (s State) PlanServers() []string {
q := map[string]bool{}
for _, db := range s.Plan.Collections {
for _, col := range db {
for _, shards := range col.Shards {
for _, shard := range shards {
q[shard] = true
}
}
}
}
r := make([]string, 0, len(q))
for k := range q {
r = append(r, k)
}
return r
}
type CollectionShardDetails []CollectionShardDetail
type CollectionShardDetail struct {
Database string
Collection string
Shard string
}
type StateShardFilter func(s State, db, col, shard string) bool
func NegateFilter(in StateShardFilter) StateShardFilter {
return func(s State, db, col, shard string) bool {
return !in(s, db, col, shard)
}
}
func (s State) Filter(f StateShardFilter) CollectionShardDetails {
shards := make(CollectionShardDetails, s.CountShards())
size := 0
for db, collections := range s.Plan.Collections {
for collection, details := range collections {
for shard := range details.Shards {
if f(s, db, collection, shard) {
shards[size] = CollectionShardDetail{
Database: db,
Collection: collection,
Shard: shard,
}
size++
}
}
}
}
if size == 0 {
return nil
}
return shards[0:size]
}
func GetDBServerBlockingRestartShards(s State, serverID string) CollectionShardDetails {
return s.Filter(FilterDBServerShardRestart(serverID))
}
func FilterDBServerShardRestart(serverID string) StateShardFilter {
return NegateFilter(func(s State, db, col, shard string) bool {
// Filter all shards which are not blocking restart of server
plan := s.Plan.Collections[db][col]
planShard := plan.Shards[shard]
if !planShard.Contains(serverID) {
// This DBServer is not even in plan, restart possible
return true
}
current := s.Current.Collections[db][col][shard]
currentShard := current.Servers.FilterBy(planShard)
serverInSync := currentShard.Contains(serverID)
if len(planShard) == 1 && serverInSync {
// The requested server is the only one in the plan, restart possible
return true
}
// If WriteConcern equals replicationFactor then downtime is always there
wc := plan.GetWriteConcern(1)
if rf := plan.GetReplicationFactor(shard); wc >= rf {
wc = rf - 1
}
if len(currentShard) >= wc && !serverInSync {
// Current shard is not in sync, but it does not matter - we have enough replicas in sync
// Restart of this DBServer won't affect WC
return true
}
if len(currentShard) > wc {
// We are in plan, but restart is possible
return true
}
// If we restart this server, write concern won't be satisfied
return false
})
}
func GetDBServerShardsNotInSync(s State, serverID string) CollectionShardDetails {
return s.Filter(FilterDBServerShardsNotInSync(serverID))
}
func FilterDBServerShardsNotInSync(serverID string) StateShardFilter {
return NegateFilter(func(s State, db, col, shard string) bool {
planShard := s.Plan.Collections[db][col].Shards[shard]
if serverID != "*" && !planShard.Contains(serverID) {
return true
}
currentShard := s.Current.Collections[db][col][shard]
if len(planShard) != len(currentShard.Servers) {
return false
}
for _, s := range planShard {
if !currentShard.Servers.Contains(s) {
return false
}
}
return true
})
}

View file

@ -0,0 +1,151 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
import (
"fmt"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func runWithMeasure(t *testing.T, name string, f func(t *testing.T)) {
t.Run(name, func(t *testing.T) {
n := time.Now()
defer func() {
t.Logf("Elapsed: %s", time.Since(n).String())
}()
f(t)
})
}
func perfWithSize(t *testing.T, dbs, collections, shards, rf, servers int) {
t.Run(fmt.Sprintf("%d/%d/%d/%d/%d", dbs, collections, shards, rf, servers), func(t *testing.T) {
var s State
runWithMeasure(t, "Generate", func(t *testing.T) {
s = generateDatabases(t, dbs, collections, shards, rf, servers)
})
var count int
runWithMeasure(t, "CountShards", func(t *testing.T) {
count = s.CountShards()
t.Logf("Shard: %d", count)
})
runWithMeasure(t, "GetDBS", func(t *testing.T) {
t.Logf("Servers: %d", len(s.PlanServers()))
})
runWithMeasure(t, "Restartable", func(t *testing.T) {
for id := 0; id < servers; id++ {
name := fmt.Sprintf("server-%d", id)
runWithMeasure(t, name, func(t *testing.T) {
require.Len(t, GetDBServerBlockingRestartShards(s, name), 0)
})
}
})
runWithMeasure(t, "NotInSync", func(t *testing.T) {
for id := 0; id < servers; id++ {
name := fmt.Sprintf("server-%d", id)
runWithMeasure(t, name, func(t *testing.T) {
require.Len(t, GetDBServerShardsNotInSync(s, name), 0)
})
}
})
runWithMeasure(t, "GlobalNotInSync", func(t *testing.T) {
require.Len(t, GetDBServerShardsNotInSync(s, "*"), 0)
})
runWithMeasure(t, "All", func(t *testing.T) {
require.Len(t, s.Filter(func(s State, db, col, shard string) bool {
return true
}), count)
})
})
}
func Test_Perf_Calc(t *testing.T) {
perfWithSize(t, 1, 1, 1, 1, 1)
perfWithSize(t, 1, 32, 32, 2, 3)
perfWithSize(t, 32, 32, 32, 3, 32)
perfWithSize(t, 128, 32, 32, 3, 32)
}
func generateDatabases(t *testing.T, dbs, collections, shards, rf, servers int) State {
gens := make([]StateGenerator, dbs)
for id := 0; id < dbs; id++ {
gens[id] = generateCollections(t, NewDatabaseRandomGenerator(), collections, shards, rf, servers).Add()
}
return GenerateState(t, gens...)
}
func generateCollections(t *testing.T, db DatabaseGeneratorInterface, collections, shards, rf, servers int) DatabaseGeneratorInterface {
d := db
for id := 0; id < collections; id++ {
d = generateShards(t, d.RandomCollection(), shards, rf, servers).Add()
}
return d
}
func generateShards(t *testing.T, col CollectionGeneratorInterface, shards, rf, servers int) CollectionGeneratorInterface {
c := col
for id := 0; id < shards; id++ {
l := getServersSublist(t, rf, servers)
c = c.WithShard().WithPlan(l...).WithCurrent(l...).Add()
}
return c
}
func getServersSublist(t *testing.T, rf, servers int) ShardServers {
require.NotEqual(t, 0, rf)
if rf > servers {
require.Fail(t, "Server count is smaller than rf")
}
return generateServersSublist(servers)[0:rf]
}
func generateServersSublist(servers int) ShardServers {
s := make(ShardServers, servers)
for id := range s {
s[id] = fmt.Sprintf("server-%d", id)
}
rand.Shuffle(len(s), func(i, j int) {
s[i], s[j] = s[j], s[i]
})
return s
}

File diff suppressed because one or more lines are too long

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

File diff suppressed because it is too large Load diff

View file

@ -122,11 +122,6 @@ func (d *Deployment) GetSpec() api.DeploymentSpec {
return d.apiObject.Spec
}
// GetDeploymentHealth returns a copy of the latest known state of cluster health
func (d *Deployment) GetDeploymentHealth() (driver.ClusterHealth, error) {
return d.resources.GetDeploymentHealth()
}
// GetStatus returns the current status of the deployment
// together with the current version of that status.
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
@ -542,16 +537,6 @@ func (d *Deployment) DeleteSecret(secretName string) error {
return nil
}
// GetShardSyncStatus returns true if all shards are in sync
func (d *Deployment) GetShardSyncStatus() bool {
return d.resources.GetShardSyncStatus()
}
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
func (d *Deployment) InvalidateSyncStatus() {
d.resources.InvalidateSyncStatus()
}
func (d *Deployment) DisableScalingCluster(ctx context.Context) error {
return d.clusterScalingIntegration.DisableScalingCluster(ctx)
}

View file

@ -138,7 +138,11 @@ type Deployment struct {
syncClientCache client.ClientCache
haveServiceMonitorCRD bool
memberState.StateInspector
memberState memberState.StateInspector
}
func (d *Deployment) GetMembersState() memberState.StateInspector {
return d.memberState
}
func (d *Deployment) GetAgencyCache() (agency.State, bool) {
@ -216,7 +220,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
agencyCache: agency.NewCache(apiObject.Spec.Mode),
}
d.StateInspector = memberState.NewStateInspector(d)
d.memberState = memberState.NewStateInspector(d)
d.clientCache = deploymentClient.NewClientCache(d, conn.NewFactory(d.getAuth, d.getConnConfig))
@ -241,8 +245,6 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
go ci.ListenForClusterEvents(d.stopCh)
go d.resources.RunDeploymentHealthLoop(d.stopCh)
go d.resources.RunDeploymentShardSyncLoop(d.stopCh)
}
if config.AllowChaos {
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)

View file

@ -116,8 +116,8 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
d.apiObject = updated
d.RefreshState(ctxReconciliation, updated.Status.Members.AsList())
d.Log(d.deps.Log)
d.GetMembersState().RefreshState(ctxReconciliation, updated.Status.Members.AsList())
d.GetMembersState().Log(d.deps.Log)
inspectNextInterval, err := d.inspectDeploymentWithError(ctxReconciliation, nextInterval, cachedStatus)
if err != nil {
@ -281,7 +281,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
// Reachable state ensurer
reachableConditionState := status.Conditions.Check(api.ConditionTypeReachable).Exists().IsTrue().Evaluate()
if d.State().IsReachable() {
if d.GetMembersState().State().IsReachable() {
if !reachableConditionState {
if err = d.updateConditionWithHash(ctx, api.ConditionTypeReachable, true, "ArangoDB is reachable", "", ""); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Unable to update Reachable condition")
@ -339,7 +339,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
}
// Inspect deployment for obsolete members
if err := d.resources.CleanupRemovedMembers(ctx); err != nil {
if err := d.resources.CleanupRemovedMembers(ctx, d.GetMembersState().Health()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Removed member cleanup failed")
}

View file

@ -31,10 +31,16 @@ import (
"github.com/rs/zerolog"
)
type StateInspectorGetter interface {
GetMembersState() StateInspector
}
type StateInspector interface {
RefreshState(ctx context.Context, members api.DeploymentStatusMemberElements)
MemberState(id string) (State, bool)
Health() Health
State() State
Log(logger zerolog.Logger)
@ -53,9 +59,15 @@ type stateInspector struct {
state State
health Health
client reconciler.DeploymentClient
}
func (s *stateInspector) Health() Health {
return s.health
}
func (s *stateInspector) State() State {
return s.state
}
@ -101,6 +113,7 @@ func (s *stateInspector) RefreshState(ctx context.Context, members api.Deploymen
defer cancel()
var cs State
var h Health
c, err := s.client.GetDatabaseClient(ctx)
if err != nil {
@ -114,6 +127,18 @@ func (s *stateInspector) RefreshState(ctx context.Context, members api.Deploymen
}
}
hctx, cancel := globals.GetGlobalTimeouts().ArangoDCheck().WithTimeout(ctx)
defer cancel()
if cluster, err := c.Cluster(hctx); err != nil {
h.Error = err
} else {
if health, err := cluster.Health(hctx); err != nil {
h.Error = err
} else {
h.Members = health.Health
}
}
current := map[string]State{}
for id := range members {
@ -122,6 +147,7 @@ func (s *stateInspector) RefreshState(ctx context.Context, members api.Deploymen
s.members = current
s.state = cs
s.health = h
}
func (s *stateInspector) MemberState(id string) (State, bool) {
@ -137,6 +163,12 @@ func (s *stateInspector) MemberState(id string) (State, bool) {
return v, ok
}
type Health struct {
Members map[driver.ServerID]driver.ServerHealth
Error error
}
type State struct {
Reachable error

View file

@ -35,6 +35,7 @@ import (
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/deployment/member"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@ -63,6 +64,8 @@ type ActionContext interface {
reconciler.DeploymentClient
reconciler.DeploymentSyncClient
member.StateInspectorGetter
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
@ -111,12 +114,6 @@ type ActionContext interface {
// SetCurrentImage changes the CurrentImage field in the deployment
// status to the given image.
SetCurrentImage(ctx context.Context, imageInfo api.ImageInfo) error
// GetDeploymentHealth returns a copy of the latest known state of cluster health
GetDeploymentHealth() (driver.ClusterHealth, error)
// GetShardSyncStatus returns true if all shards are in sync
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
// DisableScalingCluster disables scaling DBservers and coordinators
DisableScalingCluster(ctx context.Context) error
// EnableScalingCluster enables scaling DBservers and coordinators
@ -147,6 +144,10 @@ type actionContext struct {
cachedStatus inspectorInterface.Inspector
}
func (ac *actionContext) GetMembersState() member.StateInspector {
return ac.context.GetMembersState()
}
func (ac *actionContext) UpdateStatus(ctx context.Context, status api.DeploymentStatus, lastVersion int32, force ...bool) error {
return ac.context.UpdateStatus(ctx, status, lastVersion, force...)
}
@ -259,10 +260,6 @@ func (ac *actionContext) ArangoMembersModInterface() arangomember.ModInterface {
return ac.context.ArangoMembersModInterface()
}
func (ac *actionContext) GetShardSyncStatus() bool {
return ac.context.GetShardSyncStatus()
}
func (ac *actionContext) UpdateClusterCondition(ctx context.Context, conditionType api.ConditionType, status bool, reason, message string) error {
return ac.context.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
return s.Conditions.Update(conditionType, status, reason, message)
@ -294,11 +291,6 @@ func (ac *actionContext) GetSpec() api.DeploymentSpec {
return ac.context.GetSpec()
}
// GetDeploymentHealth returns a copy of the latest known state of cluster health
func (ac *actionContext) GetDeploymentHealth() (driver.ClusterHealth, error) {
return ac.context.GetDeploymentHealth()
}
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
@ -510,11 +502,6 @@ func (ac *actionContext) SetCurrentImage(ctx context.Context, imageInfo api.Imag
}, true)
}
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
func (ac *actionContext) InvalidateSyncStatus() {
ac.context.InvalidateSyncStatus()
}
// DisableScalingCluster disables scaling DBservers and coordinators
func (ac *actionContext) DisableScalingCluster(ctx context.Context) error {
return ac.context.DisableScalingCluster(ctx)

View file

@ -82,12 +82,12 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
a.log.Err(err).Str("member-id", m.ID).Msgf("Failed to remove server from cluster")
// ignore this error, maybe all coordinators are failed and no connction to cluster is possible
} else if driver.IsPreconditionFailed(err) {
health, err := a.actionCtx.GetDeploymentHealth()
if err != nil {
return false, errors.WithStack(errors.Wrapf(err, "failed to get cluster health"))
health := a.actionCtx.GetMembersState().Health()
if health.Error != nil {
return false, errors.WithStack(errors.Wrapf(health.Error, "failed to get cluster health"))
}
// We don't care if not found
if record, ok := health.Health[driver.ServerID(m.ID)]; ok {
if record, ok := health.Members[driver.ServerID(m.ID)]; ok {
// Check if the pod is terminating
if m.Conditions.IsTrue(api.ConditionTypeTerminating) {

View file

@ -28,6 +28,7 @@ import (
"github.com/rs/zerolog"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
)
func init() {
@ -93,10 +94,20 @@ func (a *actionWaitForMemberInSync) check() (bool, error) {
}
func (a *actionWaitForMemberInSync) checkCluster() (bool, error) {
if !a.actionCtx.GetShardSyncStatus() {
a.log.Info().Str("mode", "cluster").Msgf("Shards are not in sync")
return false, nil
}
switch a.action.Group {
case api.ServerGroupDBServers:
agencyState, ok := a.actionCtx.GetAgencyCache()
if !ok {
a.log.Info().Str("mode", "cluster").Str("member", a.MemberID()).Msgf("AgencyCache is missing")
return false, nil
}
notInSyncShards := agency.GetDBServerShardsNotInSync(agencyState, a.MemberID())
if len(notInSyncShards) > 0 {
a.log.Info().Str("mode", "cluster").Str("member", a.MemberID()).Int("shard", len(notInSyncShards)).Msgf("DBServer contains not in sync shards")
return false, nil
}
}
return true, nil
}

View file

@ -164,11 +164,11 @@ func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, b
// of a cluster deployment (coordinator/dbserver).
func (a *actionWaitForMemberUp) checkProgressCluster() (bool, bool, error) {
log := a.log
h, err := a.actionCtx.GetDeploymentHealth()
if err != nil {
return false, false, errors.WithStack(errors.Wrapf(err, "failed to get cluster health"))
h := a.actionCtx.GetMembersState().Health()
if h.Error != nil {
return false, false, errors.WithStack(errors.Wrapf(h.Error, "failed to get cluster health"))
}
sh, found := h.Health[driver.ServerID(a.action.MemberID)]
sh, found := h.Members[driver.ServerID(a.action.MemberID)]
if !found {
log.Debug().Msg("Member not yet found in cluster health")
return false, false, nil
@ -188,9 +188,6 @@ func (a *actionWaitForMemberUp) checkProgressCluster() (bool, bool, error) {
return false, false, nil
}
if a.action.Group == api.ServerGroupDBServers {
a.actionCtx.InvalidateSyncStatus()
}
return true, false, nil
}

View file

@ -26,9 +26,9 @@ import (
v1 "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/go-driver"
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/member"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
)
@ -51,6 +51,8 @@ type Context interface {
reconciler.KubernetesEventGenerator
reconciler.DeploymentSyncClient
member.StateInspectorGetter
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
// Returns ID, error
@ -80,12 +82,6 @@ type Context interface {
// DeleteSecret removes the Secret with given name.
// If the secret does not exist, the error is ignored.
DeleteSecret(secretName string) error
// GetDeploymentHealth returns a copy of the latest known state of cluster health
GetDeploymentHealth() (driver.ClusterHealth, error)
// GetShardSyncStatus returns true if all shards are in sync
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
// DisableScalingCluster disables scaling DBservers and coordinators
DisableScalingCluster(ctx context.Context) error
// EnableScalingCluster enables scaling DBservers and coordinators

View file

@ -50,10 +50,6 @@ type PlanBuilderContext interface {
GetTLSKeyfile(group api.ServerGroup, member api.MemberStatus) (string, error)
// GetPvc gets a PVC by the given name, in the samespace of the deployment.
GetPvc(ctx context.Context, pvcName string) (*core.PersistentVolumeClaim, error)
// GetShardSyncStatus returns true if all shards are in sync
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
// GetAuthentication return authentication for members
GetAuthentication() conn.Auth
// GetBackup receives information about a backup resource

View file

@ -29,11 +29,14 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"fmt"
"github.com/arangodb/go-driver"
upgraderules "github.com/arangodb/go-upgrade-rules"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/actions"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
@ -166,12 +169,13 @@ func createRotateOrUpgradePlanInternal(log zerolog.Logger, apiObject k8sutil.API
if d.updateAllowed {
// We are fine, group is alive so we can proceed
log.Info().Str("member", m.Member.ID).Str("Reason", d.updateMessage).Msg("Upgrade allowed")
return createUpgradeMemberPlan(log, m.Member, m.Group, "Version upgrade", spec, status, !d.upgradeDecision.AutoUpgradeNeeded), false
} else if d.unsafeUpdateAllowed {
log.Info().Str("member", m.Member.ID).Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready, but unsafe upgrade is allowed")
log.Info().Str("member", m.Member.ID).Str("Reason", d.updateMessage).Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready, but unsafe upgrade is allowed")
return createUpgradeMemberPlan(log, m.Member, m.Group, "Version upgrade", spec, status, !d.upgradeDecision.AutoUpgradeNeeded), false
} else {
log.Info().Str("member", m.Member.ID).Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.")
log.Info().Str("member", m.Member.ID).Str("Reason", d.updateMessage).Msg("Pod needs upgrade but cluster is not ready. Either some shards are not in sync or some member is not ready.")
return nil, true
}
}
@ -189,10 +193,10 @@ func createRotateOrUpgradePlanInternal(log zerolog.Logger, apiObject k8sutil.API
if !d.updateAllowed {
// Update is not allowed due to constraint
if !d.unsafeUpdateAllowed {
log.Info().Str("member", m.Member.ID).Msg("Pod needs restart but cluster is not ready. Either some shards are not in sync or some member is not ready.")
log.Info().Str("member", m.Member.ID).Str("Reason", d.updateMessage).Msg("Pod needs restart but cluster is not ready. Either some shards are not in sync or some member is not ready.")
continue
}
log.Info().Str("member", m.Member.ID).Msg("Pod needs restart but cluster is not ready. Either some shards are not in sync or some member is not ready, but unsafe upgrade is allowed")
log.Info().Str("member", m.Member.ID).Str("Reason", d.updateMessage).Msg("Pod needs restart but cluster is not ready. Either some shards are not in sync or some member is not ready, but unsafe upgrade is allowed")
}
if m.Member.Conditions.IsTrue(api.ConditionTypeRestart) {
@ -376,37 +380,49 @@ func arangoMemberPodTemplateNeedsUpdate(ctx context.Context, log zerolog.Logger,
return "", false
}
// clusterReadyForUpgrade returns true if the cluster is ready for the next update, that is:
// groupReadyForRestart returns true if the cluster is ready for the next update, that is:
// - all shards are in sync
// - all members are ready and fine
func groupReadyForRestart(context PlanBuilderContext, status api.DeploymentStatus, member api.MemberStatus, group api.ServerGroup) bool {
func groupReadyForRestart(context PlanBuilderContext, status api.DeploymentStatus, member api.MemberStatus, group api.ServerGroup) (bool, string) {
if group == api.ServerGroupSingle {
return true
return true, "Restart always in single mode"
}
if !status.Conditions.IsTrue(api.ConditionTypeBootstrapCompleted) {
// Restart is allowed always when bootstrap is not yet completed
return true
return true, "Bootstrap not completed, restart is allowed"
}
// If current member did not become ready even once. Kill it
if !member.Conditions.IsTrue(api.ConditionTypeStarted) {
return true
return true, "Member is not started"
}
// If current core containers are dead kill it.
if !member.Conditions.IsTrue(api.ConditionTypeServing) {
return true
return true, "Member is not serving"
}
if !status.Members.MembersOfGroup(group).AllMembersServing() {
return false, "Not all members are serving"
}
switch group {
case api.ServerGroupDBServers:
// TODO: Improve shard placement discovery and keep WriteConcern
return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersServing()
default:
// In case of agents we can kill only one agent at same time
return status.Members.MembersOfGroup(group).AllMembersServing()
agencyState, ok := context.GetAgencyCache()
if !ok {
// Unable to get agency state, do not restart
return false, "Unable to get agency cache"
}
blockingRestartShards := agency.GetDBServerBlockingRestartShards(agencyState, member.ID)
if s := len(blockingRestartShards); s > 0 {
return false, fmt.Sprintf("There are %d shards which are blocking restart", s)
}
}
return true, "Restart allowed"
}
// createUpgradeMemberPlan creates a plan to upgrade (stop-recreateWithAutoUpgrade-stop-start) an existing

View file

@ -55,6 +55,7 @@ type updateUpgradeDecision struct {
unsafeUpdateAllowed bool
updateAllowed bool
updateMessage string
update bool
restartRequired bool
}
@ -83,7 +84,7 @@ func createRotateOrUpgradeDecisionMember(log zerolog.Logger, spec api.Deployment
}
}
d.updateAllowed = groupReadyForRestart(context, status, element.Member, element.Group)
d.updateAllowed, d.updateMessage = groupReadyForRestart(context, status, element.Member, element.Group)
d.unsafeUpdateAllowed = util.BoolOrDefault(spec.AllowUnsafeUpgrade, false)
if rotation.CheckPossible(element.Member) {

View file

@ -46,6 +46,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/actions"
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/deployment/member"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
pod2 "github.com/arangodb/kube-arangodb/pkg/deployment/pod"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
@ -83,6 +84,11 @@ type testContext struct {
Inspector inspectorInterface.Inspector
}
func (c *testContext) GetMembersState() member.StateInspector {
//TODO implement me
panic("implement me")
}
func (c *testContext) GetMode() api.DeploymentMode {
//TODO implement me
panic("implement me")
@ -383,14 +389,6 @@ func (c *testContext) GetExpectedPodArguments(apiObject meta.Object, deplSpec ap
return nil // not implemented
}
// GetShardSyncStatus returns true if all shards are in sync
func (c *testContext) GetShardSyncStatus() bool {
return true
}
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
func (c *testContext) InvalidateSyncStatus() {}
// GetStatus returns the current status of the deployment
func (c *testContext) GetStatus() (api.DeploymentStatus, int32) {
return c.ArangoDeployment.Status, 0

View file

@ -51,7 +51,7 @@ type Context interface {
reconciler.DeploymentSyncClient
reconciler.KubernetesEventGenerator
member.StateInspector
member.StateInspectorGetter
// GetServerGroupIterator returns the deployment as ServerGroupIterator.
GetServerGroupIterator() reconciler.ServerGroupIterator

View file

@ -1,207 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package resources
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/metrics"
)
var (
deploymentHealthFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_health_fetches", "Number of times the health of the deployment was fetched", metrics.DeploymentName, metrics.Result)
deploymentSyncFetchesCounters = metrics.MustRegisterCounterVec(metricsComponent, "deployment_sync_fetches", "Number of times the sync status of shards of the deplyoment was fetched", metrics.DeploymentName, metrics.Result)
)
// RunDeploymentHealthLoop creates a loop to fetch the health of the deployment.
// The loop ends when the given channel is closed.
func (r *Resources) RunDeploymentHealthLoop(stopCh <-chan struct{}) {
log := r.log
deploymentName := r.context.GetAPIObject().GetName()
if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
// Deployment health is currently only applicable for clusters
return
}
for {
if err := r.fetchDeploymentHealth(); err != nil {
log.Debug().Err(err).Msg("Failed to fetch deployment health")
deploymentHealthFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc()
} else {
deploymentHealthFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
}
select {
case <-r.shardSync.triggerSyncInspection.Done():
case <-time.After(time.Second * 5):
// Continue
case <-stopCh:
// We're done
return
}
}
}
// fetchDeploymentHealth performs a single fetch of cluster-health
// and stores it in-memory.
func (r *Resources) fetchDeploymentHealth() error {
// Ask cluster for its health
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
client, err := r.context.GetDatabaseClient(ctx)
if err != nil {
return errors.WithStack(err)
}
c, err := client.Cluster(ctx)
if err != nil {
return errors.WithStack(err)
}
h, err := c.Health(ctx)
if err != nil {
return errors.WithStack(err)
}
// Save cluster health
r.health.mutex.Lock()
defer r.health.mutex.Unlock()
r.health.clusterHealth = h
r.health.timestamp = time.Now()
return nil
}
// GetDeploymentHealth returns a copy of the latest known state of cluster health
func (r *Resources) GetDeploymentHealth() (driver.ClusterHealth, error) {
r.health.mutex.Lock()
defer r.health.mutex.Unlock()
if r.health.timestamp.IsZero() {
return driver.ClusterHealth{}, errors.Newf("No cluster health available")
}
newhealth := r.health.clusterHealth
newhealth.Health = make(map[driver.ServerID]driver.ServerHealth)
for k, v := range r.health.clusterHealth.Health {
newhealth.Health[k] = v
}
return newhealth, nil
}
// RunDeploymentShardSyncLoop creates a loop to fetch the sync status of shards of the deployment.
// The loop ends when the given channel is closed.
func (r *Resources) RunDeploymentShardSyncLoop(stopCh <-chan struct{}) {
log := r.log
deploymentName := r.context.GetAPIObject().GetName()
if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
// Deployment health is currently only applicable for clusters
return
}
for {
if err := r.fetchClusterShardSyncState(); err != nil {
log.Debug().Err(err).Msg("Failed to fetch deployment shard sync state")
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Failed).Inc()
} else {
deploymentSyncFetchesCounters.WithLabelValues(deploymentName, metrics.Success).Inc()
}
select {
case <-time.After(time.Second * 30):
// Continue
case <-stopCh:
// We're done
return
}
}
}
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
func (r *Resources) InvalidateSyncStatus() {
r.log.Debug().Msg("Invalidating sync status due to previous events")
r.shardSync.mutex.Lock()
defer r.shardSync.mutex.Unlock()
r.shardSync.allInSync = false
r.shardSync.triggerSyncInspection.Trigger()
}
// fetchClusterShardSyncState performs a single fetch of the cluster inventory and
// checks if all shards are in sync
func (r *Resources) fetchClusterShardSyncState() error {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute*10)
defer cancel()
c, err := r.context.GetDatabaseClient(ctx)
if err != nil {
return err
}
cluster, err := c.Cluster(ctx)
if err != nil {
return err
}
dbs, err := c.Databases(ctx)
if err != nil {
return err
}
allInSync := true
dbloop:
for _, db := range dbs {
inv, err := cluster.DatabaseInventory(ctx, db)
if err != nil {
return err
}
for _, col := range inv.Collections {
if !col.AllInSync {
r.log.Debug().Str("db", db.Name()).Str("col", col.Parameters.Name).Msg("Collection not in sync")
allInSync = false
break dbloop
}
}
}
r.shardSync.mutex.Lock()
oldSyncState := r.shardSync.allInSync
r.shardSync.allInSync = allInSync
r.shardSync.timestamp = time.Now()
r.shardSync.mutex.Unlock()
if !oldSyncState && allInSync {
r.log.Debug().Msg("Everything is in sync by now")
}
return nil
}
// GetShardSyncStatus returns true if all shards are in sync
func (r *Resources) GetShardSyncStatus() bool {
if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
// Shard sync status is only applicable for clusters
return true
}
r.shardSync.mutex.Lock()
defer r.shardSync.mutex.Unlock()
return r.shardSync.allInSync
}

View file

@ -35,6 +35,7 @@ import (
driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
memberState "github.com/arangodb/kube-arangodb/pkg/deployment/member"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
@ -42,8 +43,7 @@ import (
const (
// minMemberAge is the minimum duration we expect a member to be created before we remove it because
// it is not part of a deployment.
minMemberAge = time.Minute * 10
maxClusterHealthAge = time.Second * 20
minMemberAge = time.Minute * 10
)
var (
@ -51,12 +51,12 @@ var (
)
// CleanupRemovedMembers removes all arangod members that are no longer part of ArangoDB deployment.
func (r *Resources) CleanupRemovedMembers(ctx context.Context) error {
func (r *Resources) CleanupRemovedMembers(ctx context.Context, health memberState.Health) error {
// Decide what to do depending on cluster mode
switch r.context.GetSpec().GetMode() {
case api.DeploymentModeCluster:
deploymentName := r.context.GetAPIObject().GetName()
if err := r.cleanupRemovedClusterMembers(ctx); err != nil {
if err := r.cleanupRemovedClusterMembers(ctx, health); err != nil {
cleanupRemovedMembersCounters.WithLabelValues(deploymentName, metrics.Failed).Inc()
return errors.WithStack(err)
}
@ -69,25 +69,16 @@ func (r *Resources) CleanupRemovedMembers(ctx context.Context) error {
}
// cleanupRemovedClusterMembers removes all arangod members that are no longer part of the cluster.
func (r *Resources) cleanupRemovedClusterMembers(ctx context.Context) error {
func (r *Resources) cleanupRemovedClusterMembers(ctx context.Context, health memberState.Health) error {
log := r.log
// Fetch recent cluster health
r.health.mutex.Lock()
h := r.health.clusterHealth
ts := r.health.timestamp
r.health.mutex.Unlock()
// Only accept recent cluster health values
healthAge := time.Since(ts)
if healthAge > maxClusterHealthAge {
log.Info().Dur("age", healthAge).Msg("Cleanup longer than max cluster health. Exiting")
if health.Error != nil {
log.Info().Err(health.Error).Msg("Health od the cluster is missing")
return nil
}
serverFound := func(id string) bool {
_, found := h.Health[driver.ServerID(id)]
_, found := health.Members[driver.ServerID(id)]
return found
}

View file

@ -111,7 +111,6 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
// Record termination time
now := metav1.Now()
memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now)
r.InvalidateSyncStatus()
}
}
} else if k8sutil.IsPodFailed(pod, coreContainers) {
@ -177,7 +176,6 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
// Record termination time
now := metav1.Now()
memberStatus.RecentTerminations = append(memberStatus.RecentTerminations, now)
r.InvalidateSyncStatus()
}
}
}
@ -219,7 +217,7 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
// End of Topology labels
// Reachable state
if state, ok := r.context.MemberState(memberStatus.ID); ok {
if state, ok := r.context.GetMembersState().MemberState(memberStatus.ID); ok {
if state.IsReachable() {
if memberStatus.Conditions.Update(api.ConditionTypeReachable, true, "ArangoDB is reachable", "") {
updateMemberStatusNeeded = true

View file

@ -21,11 +21,6 @@
package resources
import (
"sync"
"time"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
"github.com/rs/zerolog"
)
@ -34,17 +29,6 @@ import (
type Resources struct {
log zerolog.Logger
context Context
health struct {
clusterHealth driver.ClusterHealth // Last fetched cluster health
timestamp time.Time // Timestamp of last fetch of cluster health
mutex sync.Mutex // Mutex guarding fields in this struct
}
shardSync struct {
allInSync bool
timestamp time.Time
mutex sync.Mutex
triggerSyncInspection trigger.Trigger
}
}
// NewResources creates a new Resources service, used to