1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Add agency leader discovery (#984)

This commit is contained in:
Tomasz Mielech 2022-05-26 13:43:31 +02:00 committed by GitHub
parent 635ed17f7f
commit b4d44a9f47
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 230 additions and 56 deletions

View file

@ -8,6 +8,7 @@
- (Feature) Allow raw json value for license token-v2
- (Update) Replace `beta.kubernetes.io/arch` to `kubernetes.io/arch` in Operator Chart
- (Feature) Add operator shutdown handler for graceful termination
- (Feature) Add agency leader discovery
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)

View file

@ -22,16 +22,52 @@ package agency
import (
"context"
"fmt"
"sync"
"time"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
type health map[string]uint64
// IsHealthy returns true if all agencies have the same commit index.
// Returns false when:
// - agencies' list is empty.
// - agencies have different commit indices.
// - agencies have commit indices == 0.
func (h health) IsHealthy() bool {
var globalCommitIndex uint64
first := true
for _, commitIndex := range h {
if first {
globalCommitIndex = commitIndex
first = false
} else if commitIndex != globalCommitIndex {
return false
}
}
return globalCommitIndex != 0
}
// Health describes interface to check healthy of the environment.
type Health interface {
// IsHealthy return true when environment is considered as healthy.
IsHealthy() bool
}
type Cache interface {
Reload(ctx context.Context, client agency.Agency) (uint64, error)
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
Data() (State, bool)
CommitIndex() uint64
// GetLeaderID returns a leader ID.
GetLeaderID() string
// Health returns true when healthy object is available.
Health() (Health, bool)
}
func NewCache(mode *api.DeploymentMode) Cache {
@ -57,7 +93,17 @@ func (c cacheSingle) CommitIndex() uint64 {
return 0
}
func (c cacheSingle) Reload(ctx context.Context, client agency.Agency) (uint64, error) {
// GetLeaderID returns always empty string for a single cache.
func (c cacheSingle) GetLeaderID() string {
return ""
}
// Health returns always false for single cache.
func (c cacheSingle) Health() (Health, bool) {
return nil, false
}
func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) {
return 0, nil
}
@ -66,48 +112,169 @@ func (c cacheSingle) Data() (State, bool) {
}
type cache struct {
lock sync.Mutex
lock sync.RWMutex
valid bool
commitIndex uint64
data State
health Health
leaderID string
}
func (c *cache) CommitIndex() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()
return c.commitIndex
}
func (c *cache) Data() (State, bool) {
c.lock.Lock()
defer c.lock.Unlock()
c.lock.RLock()
defer c.lock.RUnlock()
return c.data, c.valid
}
func (c *cache) Reload(ctx context.Context, client agency.Agency) (uint64, error) {
// GetLeaderID returns a leader ID or empty string if a leader is not known.
func (c *cache) GetLeaderID() string {
c.lock.RLock()
defer c.lock.RUnlock()
return c.leaderID
}
// Health returns always false for single cache.
func (c *cache) Health() (Health, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
if c.health != nil {
return c.health, true
}
return nil, false
}
func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) {
c.lock.Lock()
defer c.lock.Unlock()
cfg, err := getAgencyConfig(ctx, client)
leaderCli, leaderConfig, health, err := getLeader(ctx, clients)
if err != nil {
// Invalidate a leader ID and agency state.
// In the next iteration leaderID will be sat because `valid` will be false.
c.leaderID = ""
c.valid = false
return 0, err
}
if cfg.CommitIndex == c.commitIndex && c.valid {
c.health = health
if leaderConfig.CommitIndex == c.commitIndex && c.valid {
// We are on same index, nothing to do
return cfg.CommitIndex, err
return leaderConfig.CommitIndex, nil
}
if data, err := loadState(ctx, client); err != nil {
// A leader should be known even if an agency state is invalid.
c.leaderID = leaderConfig.LeaderId
if data, err := loadState(ctx, leaderCli); err != nil {
c.valid = false
return cfg.CommitIndex, err
return leaderConfig.CommitIndex, err
} else {
c.data = data
c.valid = true
c.commitIndex = cfg.CommitIndex
return cfg.CommitIndex, nil
c.commitIndex = leaderConfig.CommitIndex
return leaderConfig.CommitIndex, nil
}
}
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) {
var mutex sync.Mutex
var anyError error
var wg sync.WaitGroup
cliLen := len(clients)
if cliLen == 0 {
return nil, nil, nil, errors.New("empty list of agencies' clients")
}
configs := make([]*agencyConfig, cliLen)
leaders := make(map[string]int)
h := make(health)
// Fetch all configs from agencies.
wg.Add(cliLen)
for i, cli := range clients {
go func(iLocal int, cliLocal agency.Agency) {
defer wg.Done()
ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
config, err := getAgencyConfig(ctxLocal, cliLocal)
mutex.Lock()
defer mutex.Unlock()
if err != nil {
anyError = err
return
} else if config == nil || config.LeaderId == "" {
anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints())
return
}
// Write config on the same index where client is (It will be helpful later).
configs[iLocal] = config
// Count leaders.
leaders[config.LeaderId]++
h[config.Configuration.ID] = config.CommitIndex
}(i, cli)
}
wg.Wait()
if anyError != nil {
return nil, nil, nil, wrapError(anyError, "not all agencies are responsive")
}
if len(leaders) == 0 {
return nil, nil, nil, wrapError(anyError, "failed to get config from agencies")
}
// Find the leader ID which has the most votes from all agencies.
maxVotes := 0
var leaderID string
for id, votes := range leaders {
if votes > maxVotes {
maxVotes = votes
leaderID = id
}
}
// Check if a leader has quorum from all possible agencies.
if maxVotes <= cliLen/2 {
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
return nil, nil, nil, wrapError(anyError, message)
}
// From here on, a leader with quorum is known.
for i, config := range configs {
if config != nil && config.Configuration.ID == leaderID {
return clients[i], config, h, nil
}
}
return nil, nil, nil, wrapError(anyError, "the leader is not responsive")
}
func wrapError(err error, message string) error {
if err != nil {
return errors.WithMessage(err, message)
}
return errors.New(message)
}

View file

@ -31,6 +31,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@ -46,7 +47,7 @@ type Cache interface {
Get(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
GetDatabase(ctx context.Context) (driver.Client, error)
GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWrap) (driver.Client, error)
GetAgency(ctx context.Context) (agency.Agency, error)
GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error)
}
type CacheGen interface {
@ -167,13 +168,19 @@ func (cc *cache) GetDatabaseWithWrap(ctx context.Context, wraps ...ConnectionWra
}
// GetAgency returns a cached client for the agency
func (cc *cache) GetAgency(ctx context.Context) (agency.Agency, error) {
func (cc *cache) GetAgency(_ context.Context, agencyIDs ...string) (agency.Agency, error) {
cc.mutex.Lock()
defer cc.mutex.Unlock()
// Not found, create a new client
var dnsNames []string
for _, m := range cc.in.GetStatusSnapshot().Members.Agents {
if len(agencyIDs) > 0 {
if !utils.StringList(agencyIDs).Has(m.ID) {
continue
}
}
endpoint, err := cc.in.GenerateMemberEndpoint(api.ServerGroupAgents, m)
if err != nil {
return nil, err

View file

@ -30,9 +30,10 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"k8s.io/apimachinery/pkg/types"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
@ -51,6 +52,10 @@ import (
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/arangosync-client/tasks"
driver "github.com/arangodb/go-driver"
@ -70,9 +75,6 @@ import (
serviceaccountv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount/v1"
servicemonitorv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor/v1"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var _ resources.Context = &Deployment{}
@ -234,9 +236,9 @@ func (d *Deployment) GetAgencyClientsWithPredicate(ctx context.Context, predicat
return result, nil
}
// GetAgency returns a connection to the entire agency.
func (d *Deployment) GetAgency(ctx context.Context) (agency.Agency, error) {
return d.clientCache.GetAgency(ctx)
// GetAgency returns a connection to the agency.
func (d *Deployment) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
return d.clientCache.GetAgency(ctx, agencyIDs...)
}
func (d *Deployment) getConnConfig() (http.ConnectionConfig, error) {

View file

@ -27,42 +27,35 @@ import (
"sync/atomic"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
deploymentClient "github.com/arangodb/kube-arangodb/pkg/deployment/client"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"k8s.io/apimachinery/pkg/types"
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/arangosync-client/client"
"github.com/rs/zerolog"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"github.com/arangodb/arangosync-client/client"
agencydriver "github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/acs"
"github.com/arangodb/kube-arangodb/pkg/deployment/acs/sutil"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/deployment/chaos"
deploymentClient "github.com/arangodb/kube-arangodb/pkg/deployment/client"
memberState "github.com/arangodb/kube-arangodb/pkg/deployment/member"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
"github.com/arangodb/kube-arangodb/pkg/deployment/resilience"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
)
@ -169,11 +162,17 @@ func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) {
lCtx, c := globals.GetGlobalTimeouts().Agency().WithTimeout(ctx)
defer c()
a, err := d.GetAgency(lCtx)
if err != nil {
return 0, err
var clients []agencydriver.Agency
for _, m := range d.GetStatusSnapshot().Members.Agents {
a, err := d.GetAgency(lCtx, m.ID)
if err != nil {
return 0, err
}
clients = append(clients, a)
}
return d.agencyCache.Reload(lCtx, a)
return d.agencyCache.Reload(lCtx, clients)
}
func (d *Deployment) SetAgencyMaintenanceMode(ctx context.Context, enabled bool) error {

View file

@ -22,6 +22,7 @@ package reconcile
import (
"context"
"time"
"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/go-driver/agency"
@ -29,8 +30,6 @@ import (
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
"time"
"github.com/arangodb/go-driver"
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
@ -315,9 +314,9 @@ func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connect
return c, nil
}
// GetAgency returns a connection to the entire agency.
func (ac *actionContext) GetAgency(ctx context.Context) (agency.Agency, error) {
a, err := ac.context.GetAgency(ctx)
// GetAgency returns a connection to the agency.
func (ac *actionContext) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
a, err := ac.context.GetAgency(ctx, agencyIDs...)
if err != nil {
return nil, errors.WithStack(err)
}

View file

@ -25,6 +25,7 @@ import (
"fmt"
"io/ioutil"
"testing"
"time"
monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1"
"github.com/rs/zerolog"
@ -37,8 +38,6 @@ import (
"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/go-driver/agency"
"time"
"github.com/arangodb/go-driver"
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
@ -308,7 +307,7 @@ func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup
panic("implement me")
}
func (c *testContext) GetAgency(ctx context.Context) (agency.Agency, error) {
func (c *testContext) GetAgency(_ context.Context, _ ...string) (agency.Agency, error) {
panic("implement me")
}

View file

@ -160,7 +160,7 @@ type DeploymentAgencyClient interface {
// GetAgencyClientsWithPredicate returns a client connection for every agency member which match condition.
GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error)
// GetAgency returns a connection to the entire agency.
GetAgency(ctx context.Context) (agency.Agency, error)
GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error)
}
type DeploymentDatabaseClient interface {