mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Bugfix] Extend Agency HealthCheck for replace (#1030)
This commit is contained in:
parent
92c2528aad
commit
54de5c6242
26 changed files with 605 additions and 49 deletions
|
@ -25,6 +25,7 @@
|
|||
- (Feature) Set Logger format
|
||||
- (Bugfix) Ensure Wait actions to be present after AddMember
|
||||
- (Documentation) Refactor metrics (Part 1)
|
||||
- (Bugfix) Extend Agency HealthCheck for replace
|
||||
|
||||
## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
|
||||
- (Bugfix) Fix arangosync members state inspection
|
||||
|
|
|
@ -3,7 +3,14 @@
|
|||
## List
|
||||
|
||||
| Name | Namespace | Group | Type | Description |
|
||||
|:-------------------------------------------------------------------------:|:-----------------:|:------:|:-----:|:-------------------------------------------|
|
||||
|:---------------------------------------------------------------------------------------------------------------:|:-----------------:|:------------:|:-----:|:---------------------------------------------------|
|
||||
| [arangodb_operator_agency_errors](./arangodb_operator_agency_errors.md) | arangodb_operator | agency | Count | Current count of agency cache fetch errors |
|
||||
| [arangodb_operator_agency_fetches](./arangodb_operator_agency_fetches.md) | arangodb_operator | agency | Count | Current count of agency cache fetches |
|
||||
| [arangodb_operator_agency_index](./arangodb_operator_agency_index.md) | arangodb_operator | agency | Gauge | Current index of the agency cache |
|
||||
| [arangodb_operator_agency_cache_health_present](./arangodb_operator_agency_cache_health_present.md) | arangodb_operator | agency_cache | Gauge | Determines if local agency cache health is present |
|
||||
| [arangodb_operator_agency_cache_healthy](./arangodb_operator_agency_cache_healthy.md) | arangodb_operator | agency_cache | Gauge | Determines if agency is healthy |
|
||||
| [arangodb_operator_agency_cache_leaders](./arangodb_operator_agency_cache_leaders.md) | arangodb_operator | agency_cache | Gauge | Determines agency leader vote count |
|
||||
| [arangodb_operator_agency_cache_member_commit_offset](./arangodb_operator_agency_cache_member_commit_offset.md) | arangodb_operator | agency_cache | Gauge | Determines agency member commit offset |
|
||||
| [arangodb_operator_agency_cache_member_serving](./arangodb_operator_agency_cache_member_serving.md) | arangodb_operator | agency_cache | Gauge | Determines if agency member is reachable |
|
||||
| [arangodb_operator_agency_cache_present](./arangodb_operator_agency_cache_present.md) | arangodb_operator | agency_cache | Gauge | Determines if local agency cache is present |
|
||||
| [arangodb_operator_agency_cache_serving](./arangodb_operator_agency_cache_serving.md) | arangodb_operator | agency_cache | Gauge | Determines if agency is serving |
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
# arangodb_operator_agency_cache_health_present (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines if local agency cache health is present
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
|
@ -0,0 +1,12 @@
|
|||
# arangodb_operator_agency_cache_healthy (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines if agency is healthy
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
|
@ -0,0 +1,13 @@
|
|||
# arangodb_operator_agency_cache_leaders (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines agency leader vote count. Should be always one
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
||||
| agent | Agent ID |
|
|
@ -0,0 +1,13 @@
|
|||
# arangodb_operator_agency_cache_member_commit_offset (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines agency member commit offset. Set to -1 if Agent is not reachable
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
||||
| agent | Agent ID |
|
|
@ -0,0 +1,13 @@
|
|||
# arangodb_operator_agency_cache_member_serving (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines if agency member is reachable
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
||||
| agent | Agent ID |
|
|
@ -0,0 +1,12 @@
|
|||
# arangodb_operator_agency_cache_present (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines if local agency cache is present
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
|
@ -0,0 +1,12 @@
|
|||
# arangodb_operator_agency_cache_serving (Gauge)
|
||||
|
||||
## Description
|
||||
|
||||
Determines if agency is serving
|
||||
|
||||
## Labels
|
||||
|
||||
| Label | Description |
|
||||
|:---------:|:---------------------|
|
||||
| namespace | Deployment Namespace |
|
||||
| name | Deployment Name |
|
1
go.mod
1
go.mod
|
@ -28,6 +28,7 @@ require (
|
|||
github.com/arangodb/go-driver v1.2.1
|
||||
github.com/arangodb/go-driver/v2 v2.0.0-20211021031401-d92dcd5a4c83
|
||||
github.com/arangodb/go-upgrade-rules v0.0.0-20180809110947-031b4774ff21
|
||||
github.com/arangodb/rebalancer v0.1.1
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible
|
||||
github.com/dchest/uniuri v0.0.0-20160212164326-8902c56451e9
|
||||
github.com/gin-gonic/gin v1.7.2
|
||||
|
|
|
@ -5,6 +5,76 @@ destination: pkg/generated/metric_descriptions
|
|||
|
||||
namespaces:
|
||||
arangodb_operator:
|
||||
agency_cache:
|
||||
present:
|
||||
shortDescription: "Determines if local agency cache is present"
|
||||
description: "Determines if local agency cache is present"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
health_present:
|
||||
shortDescription: "Determines if local agency cache health is present"
|
||||
description: "Determines if local agency cache health is present"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
serving:
|
||||
shortDescription: "Determines if agency is serving"
|
||||
description: "Determines if agency is serving"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
healthy:
|
||||
shortDescription: "Determines if agency is healthy"
|
||||
description: "Determines if agency is healthy"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
member_serving:
|
||||
shortDescription: "Determines if agency member is reachable"
|
||||
description: "Determines if agency member is reachable"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
- key: agent
|
||||
description: "Agent ID"
|
||||
member_commit_offset:
|
||||
shortDescription: "Determines agency member commit offset"
|
||||
description: "Determines agency member commit offset. Set to -1 if Agent is not reachable"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
- key: agent
|
||||
description: "Agent ID"
|
||||
leaders:
|
||||
shortDescription: "Determines agency leader vote count"
|
||||
description: "Determines agency leader vote count. Should be always one"
|
||||
type: "Gauge"
|
||||
labels:
|
||||
- key: namespace
|
||||
description: "Deployment Namespace"
|
||||
- key: name
|
||||
description: "Deployment Name"
|
||||
- key: agent
|
||||
description: "Agent ID"
|
||||
agency:
|
||||
index:
|
||||
shortDescription: "Current index of the agency cache"
|
||||
|
|
|
@ -24,14 +24,20 @@ import (
|
|||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/arangodb/go-driver"
|
||||
"github.com/arangodb/go-driver/agency"
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/generated/metric_descriptions"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/globals"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
)
|
||||
|
||||
type health struct {
|
||||
namespace, name string
|
||||
|
||||
leaderID string
|
||||
leader driver.Connection
|
||||
|
||||
agencySize int
|
||||
|
||||
|
@ -41,6 +47,42 @@ type health struct {
|
|||
election map[string]int
|
||||
}
|
||||
|
||||
func (h health) Leader() (driver.Connection, bool) {
|
||||
if l := h.leader; l != nil {
|
||||
return l, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func (h health) CollectMetrics(m metrics.PushMetric) {
|
||||
if err := h.Serving(); err == nil {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheServing().Gauge(1, h.namespace, h.name))
|
||||
} else {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheServing().Gauge(0, h.namespace, h.name))
|
||||
}
|
||||
|
||||
if err := h.Healthy(); err == nil {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheHealthy().Gauge(1, h.namespace, h.name))
|
||||
} else {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheHealthy().Gauge(0, h.namespace, h.name))
|
||||
}
|
||||
|
||||
for _, name := range h.names {
|
||||
if i, ok := h.commitIndexes[name]; ok {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheMemberServing().Gauge(1, h.namespace, h.name, name),
|
||||
metric_descriptions.ArangodbOperatorAgencyCacheMemberCommitOffset().Gauge(float64(i), h.namespace, h.name, name))
|
||||
} else {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheMemberServing().Gauge(0, h.namespace, h.name, name),
|
||||
metric_descriptions.ArangodbOperatorAgencyCacheMemberCommitOffset().Gauge(-1, h.namespace, h.name, name))
|
||||
}
|
||||
}
|
||||
|
||||
for k, l := range h.election {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheLeaders().Gauge(float64(l), h.namespace, h.name, k))
|
||||
}
|
||||
}
|
||||
|
||||
func (h health) LeaderID() string {
|
||||
return h.leaderID
|
||||
}
|
||||
|
@ -97,26 +139,34 @@ type Health interface {
|
|||
|
||||
// LeaderID returns a leader ID or empty string if a leader is not known.
|
||||
LeaderID() string
|
||||
|
||||
// Leader returns connection to the Agency leader
|
||||
Leader() (driver.Connection, bool)
|
||||
|
||||
CollectMetrics(m metrics.PushMetric)
|
||||
}
|
||||
|
||||
type Cache interface {
|
||||
Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error)
|
||||
Reload(ctx context.Context, size int, clients map[string]agency.Agency) (uint64, error)
|
||||
Data() (State, bool)
|
||||
CommitIndex() uint64
|
||||
// Health returns true when healthy object is available.
|
||||
Health() (Health, bool)
|
||||
}
|
||||
|
||||
func NewCache(mode *api.DeploymentMode) Cache {
|
||||
func NewCache(namespace, name string, mode *api.DeploymentMode) Cache {
|
||||
if mode.Get() == api.DeploymentModeSingle {
|
||||
return NewSingleCache()
|
||||
}
|
||||
|
||||
return NewAgencyCache()
|
||||
return NewAgencyCache(namespace, name)
|
||||
}
|
||||
|
||||
func NewAgencyCache() Cache {
|
||||
return &cache{}
|
||||
func NewAgencyCache(namespace, name string) Cache {
|
||||
return &cache{
|
||||
namespace: namespace,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
func NewSingleCache() Cache {
|
||||
|
@ -135,7 +185,7 @@ func (c cacheSingle) Health() (Health, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
func (c cacheSingle) Reload(_ context.Context, _ int, _ []agency.Agency) (uint64, error) {
|
||||
func (c cacheSingle) Reload(_ context.Context, _ int, _ map[string]agency.Agency) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
|
@ -144,6 +194,8 @@ func (c cacheSingle) Data() (State, bool) {
|
|||
}
|
||||
|
||||
type cache struct {
|
||||
namespace, name string
|
||||
|
||||
lock sync.RWMutex
|
||||
|
||||
valid bool
|
||||
|
@ -181,7 +233,7 @@ func (c *cache) Health() (Health, bool) {
|
|||
return nil, false
|
||||
}
|
||||
|
||||
func (c *cache) Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error) {
|
||||
func (c *cache) Reload(ctx context.Context, size int, clients map[string]agency.Agency) (uint64, error) {
|
||||
c.lock.Lock()
|
||||
defer c.lock.Unlock()
|
||||
|
||||
|
@ -195,6 +247,9 @@ func (c *cache) Reload(ctx context.Context, size int, clients []agency.Agency) (
|
|||
return 0, err
|
||||
}
|
||||
|
||||
health.namespace = c.namespace
|
||||
health.name = c.name
|
||||
|
||||
c.health = health
|
||||
if leaderConfig.CommitIndex == c.commitIndex && c.valid {
|
||||
// We are on same index, nothing to do
|
||||
|
@ -215,21 +270,25 @@ func (c *cache) Reload(ctx context.Context, size int, clients []agency.Agency) (
|
|||
|
||||
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
|
||||
// If there is no quorum for the leader then error is returned.
|
||||
func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
|
||||
func getLeader(ctx context.Context, size int, clients map[string]agency.Agency) (agency.Agency, *Config, health, error) {
|
||||
configs := make([]*Config, len(clients))
|
||||
errs := make([]error, len(clients))
|
||||
names := make([]string, 0, len(clients))
|
||||
for k := range clients {
|
||||
names = append(names, k)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Fetch Agency config
|
||||
for i := range clients {
|
||||
for i := range names {
|
||||
wg.Add(1)
|
||||
go func(id int) {
|
||||
defer wg.Done()
|
||||
|
||||
ctxLocal, cancel := globals.GetGlobals().Timeouts().Agency().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
config, err := GetAgencyConfig(ctxLocal, clients[id])
|
||||
config, err := GetAgencyConfig(ctxLocal, clients[names[id]])
|
||||
|
||||
if err != nil {
|
||||
errs[id] = err
|
||||
|
@ -243,8 +302,9 @@ func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.A
|
|||
wg.Wait()
|
||||
|
||||
var h health
|
||||
|
||||
h.agencySize = size
|
||||
h.names = make([]string, len(clients))
|
||||
h.names = names
|
||||
h.commitIndexes = make(map[string]uint64, len(clients))
|
||||
h.leaders = make(map[string]string, len(clients))
|
||||
h.election = make(map[string]int, len(clients))
|
||||
|
@ -252,7 +312,7 @@ func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.A
|
|||
for id := range configs {
|
||||
if config := configs[id]; config != nil {
|
||||
name := config.Configuration.ID
|
||||
h.names[id] = name
|
||||
if name == h.names[id] {
|
||||
h.commitIndexes[name] = config.CommitIndex
|
||||
if config.LeaderId != "" {
|
||||
h.leaders[name] = config.LeaderId
|
||||
|
@ -261,16 +321,21 @@ func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.A
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if err := h.Serving(); err != nil {
|
||||
return nil, nil, nil, err
|
||||
logger.Err(err).Warn("Agency Not serving")
|
||||
return nil, nil, h, err
|
||||
}
|
||||
if err := h.Healthy(); err != nil {
|
||||
logger.Err(err).Warn("Agency Not healthy")
|
||||
}
|
||||
|
||||
for id := range clients {
|
||||
for id := range names {
|
||||
if h.leaderID == h.names[id] {
|
||||
return clients[id], configs[id], h, nil
|
||||
h.leader = clients[names[id]].Connection()
|
||||
return clients[names[id]], configs[id], h, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, nil, nil, errors.Newf("Unable to find agent")
|
||||
return nil, nil, h, errors.Newf("Unable to find agent")
|
||||
}
|
||||
|
|
25
pkg/deployment/agency/logger.go
Normal file
25
pkg/deployment/agency/logger.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package agency
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/logging"
|
||||
|
||||
var logger = logging.Global().RegisterAndGetLogger("agency", logging.Info)
|
|
@ -176,17 +176,19 @@ func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) {
|
|||
lCtx, c := globals.GetGlobalTimeouts().Agency().WithTimeout(ctx)
|
||||
defer c()
|
||||
|
||||
var clients []agencydriver.Agency
|
||||
rsize := int(*size)
|
||||
|
||||
clients := make(map[string]agencydriver.Agency)
|
||||
for _, m := range d.GetStatusSnapshot().Members.Agents {
|
||||
a, err := d.GetAgency(lCtx, m.ID)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
||||
clients = append(clients, a)
|
||||
clients[m.ID] = a
|
||||
}
|
||||
|
||||
return d.agencyCache.Reload(lCtx, int(*size), clients)
|
||||
return d.agencyCache.Reload(lCtx, rsize, clients)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -248,7 +250,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
|
|||
deps: deps,
|
||||
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
|
||||
stopCh: make(chan struct{}),
|
||||
agencyCache: agency.NewCache(apiObject.Spec.Mode),
|
||||
agencyCache: agency.NewCache(apiObject.GetNamespace(), apiObject.GetName(), apiObject.Spec.Mode),
|
||||
acs: acs.NewACS(apiObject.GetUID(), i),
|
||||
}
|
||||
|
||||
|
|
|
@ -70,7 +70,7 @@ func (i *inventory) Describe(descs chan<- *prometheus.Desc) {
|
|||
pd := metrics.NewPushDescription(descs)
|
||||
pd.Push(i.deploymentsMetric, i.deploymentMetricsMembersMetric, i.deploymentAgencyStateMetric, i.deploymentShardLeadersMetric, i.deploymentShardsMetric, i.operatorStateRefreshMetric)
|
||||
|
||||
pd.Push(metric_descriptions.ArangodbOperatorAgencyErrors(), metric_descriptions.ArangodbOperatorAgencyFetches(), metric_descriptions.ArangodbOperatorAgencyIndex())
|
||||
metric_descriptions.Descriptions(pd)
|
||||
}
|
||||
|
||||
func (i *inventory) Collect(m chan<- prometheus.Metric) {
|
||||
|
@ -160,4 +160,17 @@ func (d *Deployment) CollectMetrics(m metrics.PushMetric) {
|
|||
m.Push(metric_descriptions.ArangodbOperatorAgencyErrors().Gauge(float64(d.metrics.agency.errors), d.namespace, d.name))
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyFetches().Gauge(float64(d.metrics.agency.fetches), d.namespace, d.name))
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyIndex().Gauge(float64(d.metrics.agency.index), d.namespace, d.name))
|
||||
|
||||
if c := d.agencyCache; c != nil {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCachePresent().Gauge(1, d.namespace, d.name))
|
||||
if h, ok := c.Health(); ok {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheHealthPresent().Gauge(1, d.namespace, d.name))
|
||||
|
||||
h.CollectMetrics(m)
|
||||
} else {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCacheHealthPresent().Gauge(0, d.namespace, d.name))
|
||||
}
|
||||
} else {
|
||||
m.Push(metric_descriptions.ArangodbOperatorAgencyCachePresent().Gauge(0, d.namespace, d.name))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,6 +106,16 @@ func (a *actionWaitForMemberInSync) checkCluster() (bool, error) {
|
|||
a.log.Str("mode", "cluster").Str("member", a.MemberID()).Int("shard", len(notInSyncShards)).Info("DBServer contains not in sync shards")
|
||||
return false, nil
|
||||
}
|
||||
case api.ServerGroupAgents:
|
||||
agencyHealth, ok := a.actionCtx.GetAgencyHealth()
|
||||
if !ok {
|
||||
a.log.Str("mode", "cluster").Str("member", a.MemberID()).Info("AgencyHealth is missing")
|
||||
return false, nil
|
||||
}
|
||||
if err := agencyHealth.Healthy(); err != nil {
|
||||
a.log.Str("mode", "cluster").Str("member", a.MemberID()).Err(err).Info("Agency is not yet synchronized")
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
|
|
|
@ -477,6 +477,16 @@ func groupReadyForRestart(context PlanBuilderContext, status api.DeploymentStatu
|
|||
if s := len(blockingRestartShards); s > 0 {
|
||||
return false, fmt.Sprintf("There are %d shards which are blocking restart", s)
|
||||
}
|
||||
case api.ServerGroupAgents:
|
||||
agencyHealth, ok := context.GetAgencyHealth()
|
||||
if !ok {
|
||||
// Unable to get agency state, do not restart
|
||||
return false, "Unable to get agency cache"
|
||||
}
|
||||
|
||||
if err := agencyHealth.Healthy(); err != nil {
|
||||
return false, fmt.Sprintf("Restart of agent is not allowed due to: %s", err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return true, "Restart allowed"
|
||||
|
|
|
@ -43,32 +43,32 @@ func (r *Reconciler) createScaleMemberPlan(ctx context.Context, apiObject k8suti
|
|||
switch spec.GetMode() {
|
||||
case api.DeploymentModeSingle:
|
||||
// Never scale down
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Single, api.ServerGroupSingle, 1).Filter(filterScaleUP)...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Single, api.ServerGroupSingle, 1, context).Filter(filterScaleUP)...)
|
||||
case api.DeploymentModeActiveFailover:
|
||||
// Only scale agents & singles
|
||||
if a := status.Agency; a != nil && a.Size != nil {
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Agents, api.ServerGroupAgents, int(*a.Size)).Filter(filterScaleUP)...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Agents, api.ServerGroupAgents, int(*a.Size), context).Filter(filterScaleUP)...)
|
||||
}
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Single, api.ServerGroupSingle, spec.Single.GetCount())...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Single, api.ServerGroupSingle, spec.Single.GetCount(), context)...)
|
||||
case api.DeploymentModeCluster:
|
||||
// Scale agents, dbservers, coordinators
|
||||
if a := status.Agency; a != nil && a.Size != nil {
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Agents, api.ServerGroupAgents, int(*a.Size)).Filter(filterScaleUP)...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Agents, api.ServerGroupAgents, int(*a.Size), context).Filter(filterScaleUP)...)
|
||||
}
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.GetCount())...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.GetCount())...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.GetCount(), context)...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.GetCount(), context)...)
|
||||
}
|
||||
if spec.GetMode().SupportsSync() {
|
||||
// Scale syncmasters & syncworkers
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.GetCount())...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.GetCount(), context)...)
|
||||
plan = append(plan, r.createScalePlan(status, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount(), context)...)
|
||||
}
|
||||
|
||||
return plan
|
||||
}
|
||||
|
||||
// createScalePlan creates a scaling plan for a single server group
|
||||
func (r *Reconciler) createScalePlan(status api.DeploymentStatus, members api.MemberStatusList, group api.ServerGroup, count int) api.Plan {
|
||||
func (r *Reconciler) createScalePlan(status api.DeploymentStatus, members api.MemberStatusList, group api.ServerGroup, count int, context PlanBuilderContext) api.Plan {
|
||||
var plan api.Plan
|
||||
if len(members) < count {
|
||||
// Scale up
|
||||
|
@ -87,6 +87,11 @@ func (r *Reconciler) createScalePlan(status api.DeploymentStatus, members api.Me
|
|||
if m, err := members.SelectMemberToRemove(topologyMissingMemberToRemoveSelector(status.Topology), topologyAwarenessMemberToRemoveSelector(group, status.Topology)); err != nil {
|
||||
r.planLogger.Err(err).Str("role", group.AsRole()).Warn("Failed to select member to remove")
|
||||
} else {
|
||||
ready, message := groupReadyForRestart(context, status, m, group)
|
||||
if !ready {
|
||||
r.planLogger.Str("member", m.ID).Str("role", group.AsRole()).Str("message", message).Warn("Unable to ScaleDown member")
|
||||
return nil
|
||||
}
|
||||
|
||||
r.planLogger.
|
||||
Str("member-id", m.ID).
|
||||
|
@ -117,6 +122,12 @@ func (r *Reconciler) createReplaceMemberPlan(ctx context.Context, apiObject k8su
|
|||
return nil
|
||||
}
|
||||
if member.Conditions.IsTrue(api.ConditionTypeMarkedToRemove) {
|
||||
ready, message := groupReadyForRestart(context, status, member, group)
|
||||
if !ready {
|
||||
r.planLogger.Str("member", member.ID).Str("role", group.AsRole()).Str("message", message).Warn("Unable to recreate member")
|
||||
continue
|
||||
}
|
||||
|
||||
switch group {
|
||||
case api.ServerGroupDBServers:
|
||||
plan = append(plan, actions.NewAction(api.ActionTypeAddMember, group, withPredefinedMember("")))
|
||||
|
|
|
@ -239,7 +239,6 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl
|
|||
|
||||
if newPlan, changed := getActionPlanAppender(action, plan); changed {
|
||||
// Our actions have been added to the end of plan
|
||||
d.planLogger.Info("Appending new plan items")
|
||||
return newPlan, true, nil
|
||||
}
|
||||
|
||||
|
@ -265,35 +264,45 @@ func (d *Reconciler) executePlan(ctx context.Context, statusPlan api.Plan, pg pl
|
|||
}
|
||||
|
||||
func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, action Action) (done, abort, callAgain, retry bool, err error) {
|
||||
log := d.planLogger.Str("action", string(planAction.Type)).Str("member", planAction.MemberID)
|
||||
|
||||
if !planAction.IsStarted() {
|
||||
// Not started yet
|
||||
ready, err := action.Start(ctx)
|
||||
if err != nil {
|
||||
if g := getStartFailureGracePeriod(action); g > 0 && !planAction.CreationTime.IsZero() {
|
||||
if time.Since(planAction.CreationTime.Time) < g {
|
||||
d.planLogger.Err(err).Error("Failed to start action, but still in grace period")
|
||||
log.Err(err).Error("Failed to start action, but still in grace period")
|
||||
return false, false, false, true, errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
d.planLogger.Err(err).Error("Failed to start action")
|
||||
log.Err(err).Error("Failed to start action")
|
||||
return false, false, false, false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
if ready {
|
||||
d.planLogger.Bool("ready", ready).Debug("Action Start completed")
|
||||
log.Bool("ready", ready).Info("Action Start completed")
|
||||
return true, false, false, false, nil
|
||||
}
|
||||
log.Bool("ready", ready).Info("Action Started")
|
||||
|
||||
return false, false, true, false, nil
|
||||
}
|
||||
|
||||
if t := planAction.StartTime; t != nil {
|
||||
if tm := t.Time; !tm.IsZero() {
|
||||
log = log.SinceStart("duration", tm)
|
||||
}
|
||||
}
|
||||
|
||||
// First action of plan has been started, check its progress
|
||||
ready, abort, err := action.CheckProgress(ctx)
|
||||
if err != nil {
|
||||
d.planLogger.Err(err).Debug("Failed to check action progress")
|
||||
log.Err(err).Debug("Failed to check action progress")
|
||||
return false, false, false, false, errors.WithStack(err)
|
||||
}
|
||||
|
||||
d.planLogger.
|
||||
log.
|
||||
Bool("abort", abort).
|
||||
Bool("ready", ready).
|
||||
Debug("Action CheckProgress completed")
|
||||
|
@ -303,11 +312,11 @@ func (d *Reconciler) executeAction(ctx context.Context, planAction api.Action, a
|
|||
}
|
||||
|
||||
if abort {
|
||||
d.planLogger.Warn("Action aborted. Removing the entire plan")
|
||||
log.Warn("Action aborted. Removing the entire plan")
|
||||
d.context.CreateEvent(k8sutil.NewPlanAbortedEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
|
||||
return false, true, false, false, nil
|
||||
} else if time.Now().After(planAction.CreationTime.Add(GetActionTimeout(d.context.GetSpec(), planAction.Type))) {
|
||||
d.planLogger.Warn("Action not finished in time. Removing the entire plan")
|
||||
log.Warn("Action not finished in time. Removing the entire plan")
|
||||
d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
|
||||
return false, true, false, false, nil
|
||||
}
|
||||
|
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_health_present.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_health_present.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCacheHealthPresent = metrics.NewDescription("arangodb_operator_agency_cache_health_present", "Determines if local agency cache health is present", []string{`namespace`, `name`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCacheHealthPresent)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCacheHealthPresent() metrics.Description {
|
||||
return arangodbOperatorAgencyCacheHealthPresent
|
||||
}
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_healthy.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_healthy.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCacheHealthy = metrics.NewDescription("arangodb_operator_agency_cache_healthy", "Determines if agency is healthy", []string{`namespace`, `name`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCacheHealthy)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCacheHealthy() metrics.Description {
|
||||
return arangodbOperatorAgencyCacheHealthy
|
||||
}
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_leaders.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_leaders.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCacheLeaders = metrics.NewDescription("arangodb_operator_agency_cache_leaders", "Determines agency leader vote count", []string{`namespace`, `name`, `agent`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCacheLeaders)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCacheLeaders() metrics.Description {
|
||||
return arangodbOperatorAgencyCacheLeaders
|
||||
}
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_member_commit_offset.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_member_commit_offset.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCacheMemberCommitOffset = metrics.NewDescription("arangodb_operator_agency_cache_member_commit_offset", "Determines agency member commit offset", []string{`namespace`, `name`, `agent`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCacheMemberCommitOffset)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCacheMemberCommitOffset() metrics.Description {
|
||||
return arangodbOperatorAgencyCacheMemberCommitOffset
|
||||
}
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_member_serving.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_member_serving.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCacheMemberServing = metrics.NewDescription("arangodb_operator_agency_cache_member_serving", "Determines if agency member is reachable", []string{`namespace`, `name`, `agent`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCacheMemberServing)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCacheMemberServing() metrics.Description {
|
||||
return arangodbOperatorAgencyCacheMemberServing
|
||||
}
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_present.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_present.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCachePresent = metrics.NewDescription("arangodb_operator_agency_cache_present", "Determines if local agency cache is present", []string{`namespace`, `name`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCachePresent)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCachePresent() metrics.Description {
|
||||
return arangodbOperatorAgencyCachePresent
|
||||
}
|
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_serving.go
generated
Normal file
35
pkg/generated/metric_descriptions/arangodb_operator_agency_cache_serving.go
generated
Normal file
|
@ -0,0 +1,35 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package metric_descriptions
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/metrics"
|
||||
|
||||
var (
|
||||
arangodbOperatorAgencyCacheServing = metrics.NewDescription("arangodb_operator_agency_cache_serving", "Determines if agency is serving", []string{`namespace`, `name`}, nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
registerDescription(arangodbOperatorAgencyCacheServing)
|
||||
}
|
||||
|
||||
func ArangodbOperatorAgencyCacheServing() metrics.Description {
|
||||
return arangodbOperatorAgencyCacheServing
|
||||
}
|
Loading…
Reference in a new issue