1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Allowing agents to reach failed phase

This commit is contained in:
Ewout Prangsma 2018-03-29 13:35:42 +02:00
parent b9ed689260
commit d5b749f48f
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
8 changed files with 136 additions and 72 deletions

View file

@ -97,10 +97,14 @@ func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup,
}
// GetAgencyClients returns a client connection for every agency member.
func (d *Deployment) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) {
// If the given predicate is not nil, only agents are included where the given predicate returns true.
func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error) {
agencyMembers := d.status.Members.Agents
result := make([]arangod.Agency, 0, len(agencyMembers))
for _, m := range agencyMembers {
if predicate != nil && !predicate(m.ID) {
continue
}
client, err := d.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
if err != nil {
return nil, maskAny(err)

View file

@ -105,7 +105,7 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr
// GetAgencyClients returns a client connection for every agency member.
func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) {
c, err := ac.context.GetAgencyClients(ctx)
c, err := ac.context.GetAgencyClients(ctx, nil)
if err != nil {
return nil, maskAny(err)
}

View file

@ -24,8 +24,6 @@ package reconcile
import (
"context"
"sync"
"time"
driver "github.com/arangodb/go-driver"
"github.com/rs/zerolog"
@ -44,10 +42,6 @@ func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx A
}
}
const (
maxAgentResponseTime = time.Second * 10
)
// actionWaitForMemberUp implements an WaitForMemberUp.
type actionWaitForMemberUp struct {
log zerolog.Logger
@ -99,12 +93,6 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool,
return true, nil
}
type agentStatus struct {
IsLeader bool
LeaderEndpoint string
IsResponding bool
}
// checkProgressAgent checks the progress of the action in the case
// of an agent.
func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) {
@ -115,65 +103,12 @@ func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, e
return false, maskAny(err)
}
wg := sync.WaitGroup{}
invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"}
statuses := make([]agentStatus, len(clients))
for i, c := range clients {
wg.Add(1)
go func(i int, c arangod.Agency) {
defer wg.Done()
var trash interface{}
lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime)
defer cancel()
if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) {
// We got a valid read from the leader
statuses[i].IsLeader = true
statuses[i].LeaderEndpoint = c.Endpoint()
statuses[i].IsResponding = true
} else {
if location, ok := arangod.IsNotLeader(err); ok {
// Valid response from a follower
statuses[i].IsLeader = false
statuses[i].LeaderEndpoint = location
statuses[i].IsResponding = true
} else {
// Unexpected / invalid response
log.Debug().Err(err).Str("endpoint", c.Endpoint()).Msg("Agent is not responding")
statuses[i].IsResponding = false
}
}
}(i, c)
}
wg.Wait()
// Check the results
noLeaders := 0
for i, status := range statuses {
if !status.IsResponding {
log.Debug().Msg("Not all agents are responding")
return false, nil
}
if status.IsLeader {
noLeaders++
}
if i > 0 {
// Compare leader endpoint with previous
prev := statuses[i-1].LeaderEndpoint
if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) {
log.Debug().Msg("Not all agents report the same leader endpoint")
return false, nil
}
}
}
if noLeaders != 1 {
log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders")
if err := arangod.AreAgentsHealthy(ctx, clients); err != nil {
log.Debug().Err(err).Msg("Not all agents are ready")
return false, nil
}
log.Debug().
Int("leaders", noLeaders).
Int("followers", len(statuses)-noLeaders).
Msg("Agency is happy")
log.Debug().Msg("Agency is happy")
return true, nil
}

View file

@ -50,7 +50,8 @@ type Context interface {
// GetServerClient returns a cached client for a specific server.
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]arangod.Agency, error)
// If the given predicate is not nil, only agents are included where the given predicate returns true.
GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error)
// CreateMember adds a new member to the given group.
CreateMember(group api.ServerGroup) error
// DeletePod deletes a pod with given name in the namespace

View file

@ -89,7 +89,10 @@ func createPlan(log zerolog.Logger, apiObject metav1.Object,
status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error {
for _, m := range *members {
if m.Phase == api.MemberPhaseFailed && len(plan) == 0 {
plan = append(plan, api.NewAction(api.ActionTypeRemoveMember, group, m.ID))
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
api.NewAction(api.ActionTypeAddMember, group, ""),
)
}
}
return nil

View file

@ -23,7 +23,10 @@
package resilience
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)
// Context provides methods to the resilience package.
@ -35,4 +38,7 @@ type Context interface {
// UpdateStatus replaces the status of the deployment with the given status and
// updates the resources in k8s.
UpdateStatus(status api.DeploymentStatus, force ...bool) error
// GetAgencyClients returns a client connection for every agency member.
// If the given predicate is not nil, only agents are included where the given predicate returns true.
GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error)
}

View file

@ -23,9 +23,11 @@
package resilience
import (
"context"
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)
const (
@ -88,8 +90,23 @@ func (r *Resilience) CheckMemberFailure() error {
// to failed, which means that it will be replaced.
// Return: failureAcceptable, notAcceptableReason, error
func (r *Resilience) isMemberFailureAcceptable(status api.DeploymentStatus, group api.ServerGroup, m api.MemberStatus) (bool, string, error) {
ctx := context.Background()
switch group {
case api.ServerGroupAgents:
// All good when remaining agents are health
clients, err := r.context.GetAgencyClients(ctx, func(id string) bool { return id != m.ID })
if err != nil {
return false, "", maskAny(err)
}
if err := arangod.AreAgentsHealthy(ctx, clients); err != nil {
return false, err.Error(), nil
}
return true, "", nil
case api.ServerGroupCoordinators:
// Coordinators can be replaced at will
return true, "", nil
case api.ServerGroupSyncMasters, api.ServerGroupSyncWorkers:
// Sync masters & workers can be replaced at will
return true, "", nil
default:
// TODO

View file

@ -0,0 +1,98 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package arangod
import (
"context"
"fmt"
"sync"
"time"
)
const (
maxAgentResponseTime = time.Second * 10
)
// agentStatus is a helper structure used in AreAgentsHealthy.
type agentStatus struct {
IsLeader bool
LeaderEndpoint string
IsResponding bool
}
// AreAgentsHealthy performs a health check on all given agents.
// Of the given agents, 1 must respond as leader and all others must redirect to the leader.
// The function returns nil when all agents are healthy or an error when something is wrong.
func AreAgentsHealthy(ctx context.Context, clients []Agency) error {
wg := sync.WaitGroup{}
invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"}
statuses := make([]agentStatus, len(clients))
for i, c := range clients {
wg.Add(1)
go func(i int, c Agency) {
defer wg.Done()
var trash interface{}
lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime)
defer cancel()
if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || IsKeyNotFound(err) {
// We got a valid read from the leader
statuses[i].IsLeader = true
statuses[i].LeaderEndpoint = c.Endpoint()
statuses[i].IsResponding = true
} else {
if location, ok := IsNotLeader(err); ok {
// Valid response from a follower
statuses[i].IsLeader = false
statuses[i].LeaderEndpoint = location
statuses[i].IsResponding = true
} else {
// Unexpected / invalid response
statuses[i].IsResponding = false
}
}
}(i, c)
}
wg.Wait()
// Check the results
noLeaders := 0
for i, status := range statuses {
if !status.IsResponding {
return maskAny(fmt.Errorf("Agent %s is not responding", clients[i].Endpoint()))
}
if status.IsLeader {
noLeaders++
}
if i > 0 {
// Compare leader endpoint with previous
prev := statuses[i-1].LeaderEndpoint
if !IsSameEndpoint(prev, status.LeaderEndpoint) {
return maskAny(fmt.Errorf("Not all agents report the same leader endpoint"))
}
}
}
if noLeaders != 1 {
return maskAny(fmt.Errorf("Unexpected number of agency leaders: %d", noLeaders))
}
return nil
}