mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Feature] Use only connections for healthy members (#1052)
This commit is contained in:
parent
a8002e1e2b
commit
17542604d6
25 changed files with 281 additions and 169 deletions
|
@ -6,6 +6,7 @@
|
|||
- (Refactor) Deprecate ForeachServerGroup, ForeachServerInGroups and ForServerGroup functions and refactor code accordingly
|
||||
- (Bugfix) Memory leaks due to incorrect time.After function usage
|
||||
- (Feature) Add startup probe for coordinators
|
||||
- (Feature) Use only connections for healthy members
|
||||
|
||||
## [1.2.15](https://github.com/arangodb/kube-arangodb/tree/1.2.15) (2022-07-20)
|
||||
- (Bugfix) Ensure pod names not too long
|
||||
|
|
|
@ -38,8 +38,6 @@ import (
|
|||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
type ConnectionWrap func(c driver.Connection) driver.Connection
|
||||
|
||||
type Cache interface {
|
||||
GetAuth() conn.Auth
|
||||
|
||||
|
@ -47,7 +45,6 @@ type Cache interface {
|
|||
|
||||
Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
|
||||
GetDatabase(ctx context.Context) (driver.Client, error)
|
||||
GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error)
|
||||
GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error)
|
||||
}
|
||||
|
||||
|
@ -149,25 +146,6 @@ func (cc *cache) GetDatabase(ctx context.Context) (driver.Client, error) {
|
|||
}
|
||||
}
|
||||
|
||||
func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error) {
|
||||
c, err := cc.getDatabaseClient()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn := c.Connection()
|
||||
|
||||
for _, w := range wraps {
|
||||
if w != nil {
|
||||
conn = w(conn)
|
||||
}
|
||||
}
|
||||
|
||||
return driver.NewClient(driver.ClientConfig{
|
||||
Connection: conn,
|
||||
})
|
||||
}
|
||||
|
||||
// GetAgency returns a cached client for the agency
|
||||
func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agency, error) {
|
||||
cc.mutex.Lock()
|
||||
|
|
|
@ -171,18 +171,29 @@ func (d *Deployment) UpdateMember(ctx context.Context, member api.MemberStatus)
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
|
||||
// creating one if needed.
|
||||
func (d *Deployment) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
|
||||
c, err := d.clientCache.GetDatabase(ctx)
|
||||
// GetDatabaseWithWrap wraps client to the database with provided connection.
|
||||
func (d *Deployment) GetDatabaseWithWrap(wrappers ...conn.ConnectionWrap) (driver.Client, error) {
|
||||
c, err := d.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return c, nil
|
||||
|
||||
conn := c.Connection()
|
||||
|
||||
for _, w := range wrappers {
|
||||
if w != nil {
|
||||
conn = w(conn)
|
||||
}
|
||||
}
|
||||
|
||||
return driver.NewClient(driver.ClientConfig{
|
||||
Connection: conn,
|
||||
})
|
||||
}
|
||||
|
||||
// GetDatabaseAsyncClient returns asynchronous client to the database.
|
||||
func (d *Deployment) GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error) {
|
||||
c, err := d.clientCache.GetDatabaseWithWrap(ctx, conn.NewAsyncConnection)
|
||||
c, err := d.GetDatabaseWithWrap(conn.NewAsyncConnection)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
|
|
@ -194,9 +194,7 @@ func (d *Deployment) SetAgencyMaintenanceMode(ctx context.Context, enabled bool)
|
|||
return nil
|
||||
}
|
||||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
client, err := d.GetDatabaseClient(ctxChild)
|
||||
client, err := d.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -336,8 +336,10 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
|
|||
}
|
||||
|
||||
// Inspect deployment for synced members
|
||||
if err := d.resources.SyncMembersInCluster(ctx, d.GetMembersState().Health()); err != nil {
|
||||
return minInspectionInterval, errors.Wrapf(err, "Removed member cleanup failed")
|
||||
if health, ok := d.GetMembersState().Health(); ok {
|
||||
if err := d.resources.SyncMembersInCluster(ctx, health); err != nil {
|
||||
return minInspectionInterval, errors.Wrapf(err, "Removed member cleanup failed")
|
||||
}
|
||||
}
|
||||
|
||||
// At the end of the inspect, we cleanup terminated pods.
|
||||
|
|
|
@ -22,6 +22,7 @@ package member
|
|||
|
||||
import (
|
||||
"context"
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -32,6 +33,7 @@ import (
|
|||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
|
||||
"github.com/arangodb/kube-arangodb/pkg/logging"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/globals"
|
||||
)
|
||||
|
@ -51,33 +53,44 @@ type StateInspector interface {
|
|||
|
||||
MemberState(id string) (State, bool)
|
||||
|
||||
Health() Health
|
||||
// Health returns health of members and boolean value which describes if it was possible to fetch health.
|
||||
Health() (Health, bool)
|
||||
|
||||
State() State
|
||||
|
||||
Log(logger logging.Logger)
|
||||
}
|
||||
|
||||
func NewStateInspector(client reconciler.DeploymentClient) StateInspector {
|
||||
// NewStateInspector creates a new deployment inspector.
|
||||
func NewStateInspector(deployment reconciler.DeploymentGetter) StateInspector {
|
||||
return &stateInspector{
|
||||
client: client,
|
||||
deployment: deployment,
|
||||
}
|
||||
}
|
||||
|
||||
// stateInspector provides cache for a deployment.
|
||||
type stateInspector struct {
|
||||
lock sync.Mutex
|
||||
|
||||
// lock protects internal fields of this structure.
|
||||
lock sync.RWMutex
|
||||
// members stores information about specific members of a deployment.
|
||||
members map[string]State
|
||||
|
||||
// state stores information about a deployment.
|
||||
state State
|
||||
|
||||
// health stores information about healthiness of a deployment.
|
||||
health Health
|
||||
|
||||
client reconciler.DeploymentClient
|
||||
// deployment provides a deployment resources.
|
||||
deployment reconciler.DeploymentGetter
|
||||
}
|
||||
|
||||
func (s *stateInspector) Health() Health {
|
||||
return s.health
|
||||
// Health returns health of members and true or, it returns false when fetching cluster health
|
||||
// is not possible (fail-over, single).
|
||||
func (s *stateInspector) Health() (Health, bool) {
|
||||
if s.health.Error == nil && s.health.Members == nil {
|
||||
// The health is not ready in the cluster mode, or it will never be ready in fail-over or single mode.
|
||||
return Health{}, false
|
||||
}
|
||||
|
||||
return s.health, true
|
||||
}
|
||||
|
||||
func (s *stateInspector) State() State {
|
||||
|
@ -85,8 +98,8 @@ func (s *stateInspector) State() State {
|
|||
}
|
||||
|
||||
func (s *stateInspector) Log(log logging.Logger) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
for m, s := range s.members {
|
||||
if !s.IsReachable() {
|
||||
|
@ -99,49 +112,107 @@ func (s *stateInspector) RefreshState(ctx context.Context, members api.Deploymen
|
|||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
results := make([]State, len(members))
|
||||
|
||||
nctx, cancel := globals.GetGlobalTimeouts().ArangoDCheck().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
members.ForEach(func(id int) {
|
||||
if members[id].Group.IsArangosync() {
|
||||
results[id] = s.fetchArangosyncMemberState(nctx, members[id])
|
||||
} else {
|
||||
results[id] = s.fetchServerMemberState(nctx, members[id])
|
||||
}
|
||||
})
|
||||
|
||||
gctx, cancel := globals.GetGlobalTimeouts().ArangoDCheck().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
var cs State
|
||||
var h Health
|
||||
|
||||
c, err := s.client.GetDatabaseClient(ctx)
|
||||
if err != nil {
|
||||
cs.NotReachableErr = err
|
||||
} else {
|
||||
v, err := c.Version(gctx)
|
||||
if err != nil {
|
||||
cs.NotReachableErr = err
|
||||
} else {
|
||||
cs.Version = v
|
||||
results := make([]State, len(members))
|
||||
clients := make([]driver.Client, 0, 3)
|
||||
mode := s.deployment.GetMode()
|
||||
servingGroup := mode.ServingGroup()
|
||||
members.ForEach(func(id int) {
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoDCheck().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
if members[id].Group.IsArangosync() {
|
||||
results[id] = s.fetchArangosyncMemberState(ctxChild, members[id])
|
||||
return
|
||||
}
|
||||
|
||||
hctx, cancel := globals.GetGlobalTimeouts().ArangoDCheck().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
if cluster, err := c.Cluster(hctx); err != nil {
|
||||
h.Error = err
|
||||
} else {
|
||||
if health, err := cluster.Health(hctx); err != nil {
|
||||
h.Error = err
|
||||
state := s.fetchServerMemberState(ctxChild, members[id])
|
||||
if state.IsReachable() && members[id].Group == servingGroup &&
|
||||
members[id].Member.Conditions.IsTrue(api.ConditionTypeServing) &&
|
||||
!members[id].Member.Conditions.IsTrue(api.ConditionTypeTerminating) {
|
||||
// Create slice with reachable clients (it does not mean that they are healthy).
|
||||
// In the cluster mode it will be checked later which client is healthy.
|
||||
if mode == api.DeploymentModeActiveFailover {
|
||||
globals.GetGlobalTimeouts().ArangoDCheck().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
if found, _ := arangod.IsServerAvailable(ctxChild, state.client); found {
|
||||
// Don't check error.
|
||||
// If error occurs then `clients` slice will be empty and the error `ArangoDB is not reachable`
|
||||
// will be returned.
|
||||
clients = append(clients, state.client)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
} else {
|
||||
clients = append(clients, state.client)
|
||||
}
|
||||
cs.Version = state.Version
|
||||
}
|
||||
|
||||
results[id] = state
|
||||
})
|
||||
|
||||
if len(clients) > 0 && mode.IsCluster() {
|
||||
// Get random reachable client.
|
||||
cli := clients[rand.Intn(len(clients))]
|
||||
// Clean all clients and rebuild it only with healthy clients.
|
||||
clients = clients[:0]
|
||||
|
||||
// Fetch health only in cluster mode.
|
||||
h.Error = globals.GetGlobalTimeouts().ArangoDCheck().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
if cluster, err := cli.Cluster(ctxChild); err != nil {
|
||||
return err
|
||||
} else if health, err := cluster.Health(ctxChild); err != nil {
|
||||
return err
|
||||
} else {
|
||||
h.Members = health.Health
|
||||
}
|
||||
|
||||
// Find ArangoDB (not ArangoSync) members which are not healthy and mark them accordingly.
|
||||
for i, m := range members {
|
||||
health, ok := h.Members[driver.ServerID(m.Member.ID)]
|
||||
if ok && health.SyncStatus == driver.ServerSyncStatusServing && health.Status == driver.ServerStatusGood {
|
||||
if m.Group == servingGroup {
|
||||
clients = append(clients, results[i].client)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
if results[i].NotReachableErr != nil {
|
||||
if ok {
|
||||
results[i].NotReachableErr = errors.Newf("member is not healthy "+
|
||||
"because syncStatus is %s and status is %s", health.SyncStatus, health.Status)
|
||||
} else {
|
||||
results[i].NotReachableErr = errors.Newf("member is unknown in ArangoDB healthy status")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if h.Error != nil {
|
||||
for i := range results {
|
||||
if results[i].NotReachableErr != nil {
|
||||
// A member already encountered an error.
|
||||
continue
|
||||
}
|
||||
if results[i].syncClient != nil {
|
||||
// ArangoSync Member is considered as healthy when version can be fetched.
|
||||
continue
|
||||
}
|
||||
results[i].NotReachableErr = errors.Wrapf(h.Error, "cluster healthy is unknown")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(clients) > 0 {
|
||||
cs.client = clients[rand.Intn(len(clients))]
|
||||
} else {
|
||||
cs.NotReachableErr = errors.New("ArangoDB is not reachable")
|
||||
}
|
||||
|
||||
current := map[string]State{}
|
||||
|
||||
for id := range members {
|
||||
|
@ -155,7 +226,7 @@ func (s *stateInspector) RefreshState(ctx context.Context, members api.Deploymen
|
|||
|
||||
func (s *stateInspector) fetchArangosyncMemberState(ctx context.Context, m api.DeploymentStatusMemberElement) State {
|
||||
var state State
|
||||
c, err := s.client.GetSyncServerClient(ctx, m.Group, m.Member.ID)
|
||||
c, err := s.deployment.GetSyncServerClient(ctx, m.Group, m.Member.ID)
|
||||
if err != nil {
|
||||
state.NotReachableErr = err
|
||||
return state
|
||||
|
@ -180,7 +251,7 @@ func (s *stateInspector) fetchArangosyncMemberState(ctx context.Context, m api.D
|
|||
|
||||
func (s *stateInspector) fetchServerMemberState(ctx context.Context, m api.DeploymentStatusMemberElement) State {
|
||||
var state State
|
||||
c, err := s.client.GetServerClient(ctx, m.Group, m.Member.ID)
|
||||
c, err := s.deployment.GetServerClient(ctx, m.Group, m.Member.ID)
|
||||
if err != nil {
|
||||
state.NotReachableErr = err
|
||||
return state
|
||||
|
@ -228,8 +299,8 @@ func (s *stateInspector) GetMemberSyncClient(id string) (client.API, error) {
|
|||
}
|
||||
|
||||
func (s *stateInspector) MemberState(id string) (State, bool) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
s.lock.RLock()
|
||||
defer s.lock.RUnlock()
|
||||
|
||||
if s.members == nil {
|
||||
return State{}, false
|
||||
|
@ -240,9 +311,12 @@ func (s *stateInspector) MemberState(id string) (State, bool) {
|
|||
return v, ok
|
||||
}
|
||||
|
||||
// Health describes a cluster health. In the fail-over or single mode fields members and error will be nil.
|
||||
// In the cluster mode only one field should be set: error or members.
|
||||
type Health struct {
|
||||
// Members is a map of members of the cluster.
|
||||
Members map[driver.ServerID]driver.ServerHealth
|
||||
|
||||
// Errors is set when it is not possible to fetch a cluster info.
|
||||
Error error
|
||||
}
|
||||
|
||||
|
@ -258,6 +332,19 @@ type State struct {
|
|||
syncClient client.API
|
||||
}
|
||||
|
||||
// GetDatabaseClient returns client to the database.
|
||||
func (s State) GetDatabaseClient() (driver.Client, error) {
|
||||
if s.client != nil {
|
||||
return s.client, nil
|
||||
}
|
||||
|
||||
if s.NotReachableErr != nil {
|
||||
return nil, s.NotReachableErr
|
||||
}
|
||||
|
||||
return nil, errors.Newf("ArangoDB is not reachable")
|
||||
}
|
||||
|
||||
func (s State) IsReachable() bool {
|
||||
return s.NotReachableErr == nil
|
||||
}
|
||||
|
|
21
pkg/deployment/member/state_test.go
Normal file
21
pkg/deployment/member/state_test.go
Normal file
|
@ -0,0 +1,21 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package member
|
|
@ -132,9 +132,7 @@ func (a actionBackupRestore) restoreAsync(ctx context.Context, backup *backupApi
|
|||
}
|
||||
|
||||
func (a actionBackupRestore) restoreSync(ctx context.Context, backup *backupApi.ArangoBackup) (bool, error) {
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
dbc, err := a.actionCtx.GetDatabaseClient(ctxChild)
|
||||
dbc, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
a.log.Err(err).Debug("Failed to create database client")
|
||||
return false, nil
|
||||
|
|
|
@ -98,9 +98,7 @@ func (a actionBootstrapSetPassword) Start(ctx context.Context) (bool, error) {
|
|||
func (a actionBootstrapSetPassword) setUserPassword(ctx context.Context, user, secret string) (string, error) {
|
||||
a.log.Debug("Bootstrapping user %s, secret %s", user, secret)
|
||||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
client, err := a.actionCtx.GetDatabaseClient(ctxChild)
|
||||
client, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return "", errors.WithStack(err)
|
||||
}
|
||||
|
@ -111,7 +109,7 @@ func (a actionBootstrapSetPassword) setUserPassword(ctx context.Context, user, s
|
|||
}
|
||||
|
||||
// Obtain the user
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
if u, err := client.User(ctxChild, user); err != nil {
|
||||
if !driver.IsNotFound(err) {
|
||||
|
|
|
@ -66,15 +66,13 @@ func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
c, err := a.actionCtx.GetDatabaseClient(ctxChild)
|
||||
c, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
a.log.Err(err).Debug("Failed to create member client")
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
cluster, err := c.Cluster(ctxChild)
|
||||
if err != nil {
|
||||
|
|
|
@ -66,14 +66,12 @@ func (a *actionClusterMemberCleanup) Start(ctx context.Context) (bool, error) {
|
|||
func (a *actionClusterMemberCleanup) start(ctx context.Context) error {
|
||||
id := driver.ServerID(a.MemberID())
|
||||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
c, err := a.actionCtx.GetDatabaseClient(ctxChild)
|
||||
c, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
cluster, err := c.Cluster(ctxChild)
|
||||
if err != nil {
|
||||
|
|
|
@ -277,7 +277,7 @@ func (ac *actionContext) CreateEvent(evt *k8sutil.Event) {
|
|||
ac.context.CreateEvent(evt)
|
||||
}
|
||||
|
||||
// Gets the specified mode of deployment
|
||||
// GetMode gets the specified mode of deployment.
|
||||
func (ac *actionContext) GetMode() api.DeploymentMode {
|
||||
return ac.context.GetSpec().GetMode()
|
||||
}
|
||||
|
@ -286,16 +286,6 @@ func (ac *actionContext) GetSpec() api.DeploymentSpec {
|
|||
return ac.context.GetSpec()
|
||||
}
|
||||
|
||||
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
|
||||
// creating one if needed.
|
||||
func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
|
||||
c, err := ac.context.GetDatabaseClient(ctx)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// GetMemberStatusByID returns the current member status
|
||||
// for the member with given id.
|
||||
// Returns member status, true when found, or false
|
||||
|
|
|
@ -74,21 +74,19 @@ func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
|
|||
|
||||
// For safety, remove from cluster
|
||||
if a.action.Group == api.ServerGroupCoordinators || a.action.Group == api.ServerGroupDBServers {
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
client, err := a.actionCtx.GetDatabaseClient(ctxChild)
|
||||
client, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
if err := arangod.RemoveServerFromCluster(ctxChild, client.Connection(), driver.ServerID(m.ID)); err != nil {
|
||||
if !driver.IsNotFound(err) && !driver.IsPreconditionFailed(err) {
|
||||
a.log.Err(err).Str("member-id", m.ID).Error("Failed to remove server from cluster")
|
||||
// ignore this error, maybe all coordinators are failed and no connection to cluster is possible
|
||||
} else if driver.IsPreconditionFailed(err) {
|
||||
health := a.actionCtx.GetMembersState().Health()
|
||||
health, _ := a.actionCtx.GetMembersState().Health()
|
||||
if health.Error != nil {
|
||||
a.log.Err(err).Str("member-id", m.ID).Error("Failed get cluster health")
|
||||
}
|
||||
|
|
|
@ -64,9 +64,7 @@ func (a *actionResignLeadership) Start(ctx context.Context) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
client, err := a.actionCtx.GetDatabaseClient(ctxChild)
|
||||
client, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
a.log.Err(err).Error("Unable to get client")
|
||||
return true, errors.WithStack(err)
|
||||
|
@ -83,7 +81,7 @@ func (a *actionResignLeadership) Start(ctx context.Context) (bool, error) {
|
|||
return true, nil
|
||||
}
|
||||
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
cluster, err := client.Cluster(ctxChild)
|
||||
if err != nil {
|
||||
|
|
|
@ -95,7 +95,7 @@ func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, bool,
|
|||
// checkProgressSingle checks the progress of the action in the case
|
||||
// of a single server.
|
||||
func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, bool, error) {
|
||||
c, err := a.actionCtx.GetDatabaseClient(ctx)
|
||||
c, err := a.actionCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
a.log.Err(err).Debug("Failed to create database client")
|
||||
return false, false, nil
|
||||
|
@ -143,7 +143,7 @@ func (a *actionWaitForMemberUp) checkProgressAgent() (bool, bool, error) {
|
|||
// checkProgressCluster checks the progress of the action in the case
|
||||
// of a cluster deployment (coordinator/dbserver).
|
||||
func (a *actionWaitForMemberUp) checkProgressCluster() (bool, bool, error) {
|
||||
h := a.actionCtx.GetMembersState().Health()
|
||||
h, _ := a.actionCtx.GetMembersState().Health()
|
||||
if h.Error != nil {
|
||||
a.log.Err(h.Error).Debug("Cluster health is missing")
|
||||
return false, false, nil
|
||||
|
|
|
@ -42,14 +42,12 @@ func (r *Reconciler) createClusterOperationPlan(ctx context.Context, apiObject k
|
|||
return nil
|
||||
}
|
||||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
c, err := planCtx.GetDatabaseClient(ctxChild)
|
||||
c, err := planCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
cluster, err := c.Cluster(ctxChild)
|
||||
if err != nil {
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/arangodb/arangosync-client/client"
|
||||
"github.com/arangodb/go-driver"
|
||||
"github.com/arangodb/go-driver/agency"
|
||||
|
||||
|
@ -82,6 +83,7 @@ type testContext struct {
|
|||
RecordedEvent *k8sutil.Event
|
||||
|
||||
Inspector inspectorInterface.Inspector
|
||||
state member.StateInspector
|
||||
}
|
||||
|
||||
func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) {
|
||||
|
@ -116,8 +118,7 @@ func (c *testContext) WithCurrentArangoMember(name string) reconciler.ArangoMemb
|
|||
}
|
||||
|
||||
func (c *testContext) GetMembersState() member.StateInspector {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
return c.state
|
||||
}
|
||||
|
||||
func (c *testContext) GetMode() api.DeploymentMode {
|
||||
|
@ -284,10 +285,6 @@ func (c *testContext) UpdateMember(_ context.Context, member api.MemberStatus) e
|
|||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *testContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
|
||||
return nil, errors.Newf("Client Not Found")
|
||||
}
|
||||
|
||||
func (c *testContext) GetAgency(_ context.Context, _ ...string) (agency.Agency, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
@ -666,7 +663,6 @@ type testCase struct {
|
|||
ExpectedEvent *k8sutil.Event
|
||||
|
||||
kclient.FakeDataInput
|
||||
|
||||
Extender func(t *testing.T, r *Reconciler, c *testCase)
|
||||
}
|
||||
|
||||
|
@ -1160,6 +1156,11 @@ func TestCreatePlan(t *testing.T) {
|
|||
i := testCase.Inspector(t)
|
||||
|
||||
testCase.context.Inspector = i
|
||||
testCase.context.state = &FakeStateInspector{
|
||||
state: member.State{
|
||||
NotReachableErr: errors.New("Client Not Found"),
|
||||
},
|
||||
}
|
||||
|
||||
h := &LastLogRecord{t: t}
|
||||
logger := logging.NewFactory(zerolog.New(ioutil.Discard).Hook(h)).RegisterAndGetLogger("test", logging.Debug)
|
||||
|
@ -1220,3 +1221,41 @@ func TestCreatePlan(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
type FakeStateInspector struct {
|
||||
state member.State
|
||||
}
|
||||
|
||||
func (FakeStateInspector) RefreshState(_ context.Context, _ api.DeploymentStatusMemberElements) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (FakeStateInspector) GetMemberClient(_ string) (driver.Client, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (FakeStateInspector) GetMemberSyncClient(_ string) (client.API, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (FakeStateInspector) MemberState(_ string) (member.State, bool) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (FakeStateInspector) Health() (member.Health, bool) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (f FakeStateInspector) State() member.State {
|
||||
return f.state
|
||||
}
|
||||
|
||||
func (FakeStateInspector) Log(_ logging.Logger) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
|
|
@ -54,14 +54,12 @@ func secretKeysToList(s *core.Secret) []string {
|
|||
|
||||
// getCluster returns the cluster connection.
|
||||
func getCluster(ctx context.Context, planCtx PlanBuilderContext) (driver.Cluster, error) {
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
c, err := planCtx.GetDatabaseClient(ctxChild)
|
||||
c, err := planCtx.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(errors.Wrapf(err, "Unable to get database client"))
|
||||
}
|
||||
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
cluster, err := c.Cluster(ctxChild)
|
||||
if err != nil {
|
||||
|
|
|
@ -121,10 +121,6 @@ type ArangoApplier interface {
|
|||
}
|
||||
|
||||
type DeploymentDatabaseClient interface {
|
||||
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
|
||||
// creating one if needed.
|
||||
GetDatabaseClient(ctx context.Context) (driver.Client, error)
|
||||
|
||||
// GetDatabaseAsyncClient returns a cached client for the entire database (cluster coordinators or single server),
|
||||
// creating one if needed. Only in AsyncMode
|
||||
GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error)
|
||||
|
@ -146,8 +142,15 @@ type KubernetesEventGenerator interface {
|
|||
CreateEvent(evt *k8sutil.Event)
|
||||
}
|
||||
|
||||
// DeploymentClient provides functionalities to get deployment's clients.
|
||||
type DeploymentClient interface {
|
||||
DeploymentDatabaseClient
|
||||
DeploymentMemberClient
|
||||
DeploymentSyncClient
|
||||
}
|
||||
|
||||
// DeploymentGetter provides functionalities to get deployment resources.
|
||||
type DeploymentGetter interface {
|
||||
DeploymentClient
|
||||
DeploymentInfoGetter
|
||||
}
|
||||
|
|
|
@ -43,8 +43,7 @@ type Context interface {
|
|||
reconciler.DeploymentImageManager
|
||||
reconciler.ArangoAgency
|
||||
reconciler.ArangoApplier
|
||||
reconciler.DeploymentInfoGetter
|
||||
reconciler.DeploymentClient
|
||||
reconciler.DeploymentGetter
|
||||
reconciler.KubernetesEventGenerator
|
||||
|
||||
member.StateInspectorGetter
|
||||
|
|
|
@ -22,18 +22,16 @@ package resources
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
"github.com/arangodb/go-driver"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/globals"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
|
@ -170,7 +168,7 @@ func (r *Resources) getSingleServerLeaderID(ctx context.Context) (string, error)
|
|||
return err
|
||||
}
|
||||
|
||||
if available, err := isServerAvailable(ctxChild, c); err != nil {
|
||||
if available, err := arangod.IsServerAvailable(ctxChild, c); err != nil {
|
||||
return err
|
||||
} else if !available {
|
||||
return errors.New("not available")
|
||||
|
@ -311,23 +309,3 @@ func (r *Resources) ensureSingleServerLeaderServices(ctx context.Context, cached
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
// isServerAvailable returns true when server is available.
|
||||
// In active fail-over mode one of the server should be available.
|
||||
func isServerAvailable(ctx context.Context, c driver.Client) (bool, error) {
|
||||
req, err := c.Connection().NewRequest("GET", "_admin/server/availability")
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
resp, err := c.Connection().Do(ctx, req)
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := resp.CheckStatus(http.StatusOK, http.StatusServiceUnavailable); err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return resp.StatusCode() == http.StatusOK, nil
|
||||
}
|
||||
|
|
|
@ -162,9 +162,7 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, p *core.P
|
|||
}
|
||||
|
||||
// Inspect cleaned out state
|
||||
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
c, err := r.context.GetDatabaseClient(ctxChild)
|
||||
c, err := r.context.GetMembersState().State().GetDatabaseClient()
|
||||
if err != nil {
|
||||
log.Err(err).Debug("Failed to create member client")
|
||||
return errors.WithStack(err)
|
||||
|
|
|
@ -31,6 +31,8 @@ import (
|
|||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
)
|
||||
|
||||
type ConnectionWrap func(c driver.Connection) driver.Connection
|
||||
|
||||
func NewAsyncConnection(c driver.Connection) driver.Connection {
|
||||
return async{
|
||||
connectionPass: connectionPass{
|
||||
|
|
|
@ -22,6 +22,7 @@ package arangod
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
driver "github.com/arangodb/go-driver"
|
||||
|
||||
|
@ -76,3 +77,23 @@ func IsDBServerEmpty(ctx context.Context, id string, client driver.Client) error
|
|||
// DBServer is not used in any shard of any database
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsServerAvailable returns true when server is available.
|
||||
// In active fail-over mode one of the server should be available.
|
||||
func IsServerAvailable(ctx context.Context, c driver.Client) (bool, error) {
|
||||
req, err := c.Connection().NewRequest("GET", "_admin/server/availability")
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
resp, err := c.Connection().Do(ctx, req)
|
||||
if err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := resp.CheckStatus(http.StatusOK, http.StatusServiceUnavailable); err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return resp.StatusCode() == http.StatusOK, nil
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import "time"
|
|||
|
||||
const (
|
||||
DefaultKubernetesTimeout = 2 * time.Second
|
||||
DefaultArangoDTimeout = time.Second * 10
|
||||
DefaultArangoDTimeout = time.Second * 5
|
||||
DefaultArangoDAgencyTimeout = time.Second * 10
|
||||
DefaultArangoDCheckTimeout = time.Second * 2
|
||||
DefaultReconciliationTimeout = time.Minute
|
||||
|
|
Loading…
Reference in a new issue