1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Extended resilience failure detection to agents.

Added test for agents & coordinators.

[ci LONG=1]
[ci TESTOPTIONS="-test.run ^TestMemberResilience"]
This commit is contained in:
Ewout Prangsma 2018-03-29 16:08:16 +02:00
parent d5b749f48f
commit 70f0fa2459
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
12 changed files with 299 additions and 39 deletions

View file

@ -35,6 +35,8 @@ type MemberStatus struct {
ID string `json:"id"`
// Phase holds the current lifetime phase of this member
Phase MemberPhase `json:"phase"`
// CreatedAt holds the creation timestamp of this member.
CreatedAt metav1.Time `json:"created-at"`
// PersistentVolumeClaimName holds the name of the persistent volume claim used for this member (if any).
PersistentVolumeClaimName string `json:"persistentVolumeClaimName,omitempty"`
// PodName holds the name of the Pod that currently runs this member

View file

@ -364,6 +364,7 @@ func (in *ImageInfo) DeepCopy() *ImageInfo {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *MemberStatus) DeepCopyInto(out *MemberStatus) {
*out = *in
in.CreatedAt.DeepCopyInto(&out.CreatedAt)
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make(ConditionList, len(*in))

View file

@ -119,9 +119,11 @@ func (d *Deployment) GetAgencyClients(ctx context.Context, predicate func(id str
}
// CreateMember adds a new member to the given group.
func (d *Deployment) CreateMember(group api.ServerGroup) error {
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
log := d.deps.Log
if err := d.createMember(group, d.apiObject); err != nil {
id, err := d.createMember(group, id, d.apiObject)
if err != nil {
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
return maskAny(err)
}
@ -130,6 +132,9 @@ func (d *Deployment) CreateMember(group api.ServerGroup) error {
log.Debug().Err(err).Msg("Updating CR status failed")
return maskAny(err)
}
// Create event about it
d.CreateEvent(k8sutil.NewMemberAddEvent(id, group.AsRole(), d.apiObject))
return nil
}

View file

@ -26,9 +26,11 @@ import (
"fmt"
"strings"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/dchest/uniuri"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
@ -39,11 +41,14 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error
log.Debug().Msg("creating initial members...")
// Go over all groups and create members
var events []*v1.Event
if err := apiObject.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error {
for len(*status) < spec.GetCount() {
if err := d.createMember(group, apiObject); err != nil {
id, err := d.createMember(group, "", apiObject)
if err != nil {
return maskAny(err)
}
events = append(events, k8sutil.NewMemberAddEvent(id, group.AsRole(), apiObject))
}
return nil
}, &d.status); err != nil {
@ -55,6 +60,10 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
// Save events
for _, evt := range events {
d.CreateEvent(evt)
}
return nil
}
@ -62,16 +71,17 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error
// createMember creates member and adds it to the applicable member list.
// Note: This does not create any pods of PVCs
// Note: The updated status is not yet written to the apiserver.
func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDeployment) error {
func (d *Deployment) createMember(group api.ServerGroup, id string, apiObject *api.ArangoDeployment) (string, error) {
log := d.deps.Log
var id string
idPrefix := getArangodIDPrefix(group)
for {
id = idPrefix + strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well
if !d.status.Members.ContainsID(id) {
break
if id == "" {
idPrefix := getArangodIDPrefix(group)
for {
id = idPrefix + strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well
if !d.status.Members.ContainsID(id) {
break
}
// Duplicate, try again
}
// Duplicate, try again
}
deploymentName := apiObject.GetName()
role := group.AsRole()
@ -80,68 +90,74 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
case api.ServerGroupSingle:
log.Debug().Str("id", id).Msg("Adding single server")
if err := d.status.Members.Single.Add(api.MemberStatus{
ID: id,
Phase: api.MemberPhaseNone,
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
return "", maskAny(err)
}
case api.ServerGroupAgents:
log.Debug().Str("id", id).Msg("Adding agent")
if err := d.status.Members.Agents.Add(api.MemberStatus{
ID: id,
Phase: api.MemberPhaseNone,
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
return "", maskAny(err)
}
case api.ServerGroupDBServers:
log.Debug().Str("id", id).Msg("Adding dbserver")
if err := d.status.Members.DBServers.Add(api.MemberStatus{
ID: id,
Phase: api.MemberPhaseNone,
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
return "", maskAny(err)
}
case api.ServerGroupCoordinators:
log.Debug().Str("id", id).Msg("Adding coordinator")
if err := d.status.Members.Coordinators.Add(api.MemberStatus{
ID: id,
Phase: api.MemberPhaseNone,
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: "",
PodName: "",
}); err != nil {
return maskAny(err)
return "", maskAny(err)
}
case api.ServerGroupSyncMasters:
log.Debug().Str("id", id).Msg("Adding syncmaster")
if err := d.status.Members.SyncMasters.Add(api.MemberStatus{
ID: id,
Phase: api.MemberPhaseNone,
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: "",
PodName: "",
}); err != nil {
return maskAny(err)
return "", maskAny(err)
}
case api.ServerGroupSyncWorkers:
log.Debug().Str("id", id).Msg("Adding syncworker")
if err := d.status.Members.SyncWorkers.Add(api.MemberStatus{
ID: id,
Phase: api.MemberPhaseNone,
ID: id,
CreatedAt: metav1.Now(),
Phase: api.MemberPhaseNone,
PersistentVolumeClaimName: "",
PodName: "",
}); err != nil {
return maskAny(err)
return "", maskAny(err)
}
default:
return maskAny(fmt.Errorf("Unknown server group %d", group))
return "", maskAny(fmt.Errorf("Unknown server group %d", group))
}
return nil
return id, nil
}
// getArangodIDPrefix returns the prefix required ID's of arangod servers

View file

@ -51,7 +51,7 @@ type actionAddMember struct {
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionAddMember) Start(ctx context.Context) (bool, error) {
if err := a.actionCtx.CreateMember(a.action.Group); err != nil {
if err := a.actionCtx.CreateMember(a.action.Group, a.action.MemberID); err != nil {
log.Debug().Err(err).Msg("Failed to create member")
return false, maskAny(err)
}

View file

@ -52,7 +52,8 @@ type ActionContext interface {
// when no such member is found.
GetMemberStatusByID(id string) (api.MemberStatus, bool)
// CreateMember adds a new member to the given group.
CreateMember(group api.ServerGroup) error
// If ID is non-empty, it will be used, otherwise a new ID is created.
CreateMember(group api.ServerGroup, id string) error
// UpdateMember updates the deployment status wrt the given member.
UpdateMember(member api.MemberStatus) error
// RemoveMemberByID removes a member with given id.
@ -122,8 +123,9 @@ func (ac *actionContext) GetMemberStatusByID(id string) (api.MemberStatus, bool)
}
// CreateMember adds a new member to the given group.
func (ac *actionContext) CreateMember(group api.ServerGroup) error {
if err := ac.context.CreateMember(group); err != nil {
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (ac *actionContext) CreateMember(group api.ServerGroup, id string) error {
if err := ac.context.CreateMember(group, id); err != nil {
return maskAny(err)
}
return nil

View file

@ -53,7 +53,8 @@ type Context interface {
// If the given predicate is not nil, only agents are included where the given predicate returns true.
GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]arangod.Agency, error)
// CreateMember adds a new member to the given group.
CreateMember(group api.ServerGroup) error
// If ID is non-empty, it will be used, otherwise a new ID is created.
CreateMember(group api.ServerGroup, id string) error
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(podName string) error

View file

@ -89,9 +89,13 @@ func createPlan(log zerolog.Logger, apiObject metav1.Object,
status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error {
for _, m := range *members {
if m.Phase == api.MemberPhaseFailed && len(plan) == 0 {
newID := ""
if group == api.ServerGroupAgents {
newID = m.ID // Agents cannot (yet) be replaced with new IDs
}
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
api.NewAction(api.ActionTypeAddMember, group, ""),
api.NewAction(api.ActionTypeAddMember, group, newID),
)
}
}

View file

@ -405,7 +405,7 @@ func (r *Resources) createPodForMember(spec api.DeploymentSpec, group api.Server
return maskAny(err)
}
// Create event
r.context.CreateEvent(k8sutil.NewMemberAddEvent(m.PodName, role, apiObject))
r.context.CreateEvent(k8sutil.NewPodCreatedEvent(m.PodName, role, apiObject))
return nil
}

View file

@ -144,6 +144,10 @@ func (r *Resources) InspectPods() error {
// Shutdown was intended, so not need to do anything here.
// Just mark terminated
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") {
// Record termination time
now := metav1.Now()
m.RecentTerminations = append(m.RecentTerminations, now)
// Save it
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
return maskAny(err)
}
@ -154,6 +158,10 @@ func (r *Resources) InspectPods() error {
// Create event
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), apiObject))
if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") {
// Record termination time
now := metav1.Now()
m.RecentTerminations = append(m.RecentTerminations, now)
// Save it
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
return maskAny(err)
}

View file

@ -59,6 +59,15 @@ func NewMemberRemoveEvent(memberName, role string, apiObject APIObject) *v1.Even
return event
}
// NewPodCreatedEvent creates an event indicating that a pod has been created
func NewPodCreatedEvent(podName, role string, apiObject APIObject) *v1.Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("Pod Of %s Created", strings.Title(role))
event.Message = fmt.Sprintf("Pod %s of member %s is created", podName, role)
return event
}
// NewPodGoneEvent creates an event indicating that a pod is missing
func NewPodGoneEvent(podName, role string, apiObject APIObject) *v1.Event {
event := newDeploymentEvent(apiObject)

View file

@ -0,0 +1,212 @@
package tests
import (
"context"
"fmt"
"testing"
"time"
"github.com/dchest/uniuri"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/client"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
)
// TestMemberResilienceAgents creates a cluster and removes a
// specific agent pod 5 times. Each time it waits for a new pod to arrive.
// After 5 times, the member should be replaced by another member with the same ID.
func TestMemberResilienceAgents(t *testing.T) {
longOrSkip(t)
c := client.MustNewInCluster()
kubecli := mustNewKubeClient(t)
ns := getNamespace(t)
// Prepare deployment config
depl := newDeployment("test-member-res-agnt-" + uniuri.NewLen(4))
depl.Spec.Mode = api.NewMode(api.DeploymentModeCluster)
depl.Spec.SetDefaults(depl.GetName()) // this must be last
// Create deployment
apiObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Create(depl)
if err != nil {
t.Fatalf("Create deployment failed: %v", err)
}
// Wait for deployment to be ready
if _, err = waitUntilDeployment(c, depl.GetName(), ns, deploymentIsReady()); err != nil {
t.Fatalf("Deployment not running in time: %v", err)
}
// Create a database client
ctx := context.Background()
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
// Wait for cluster to be completely ready
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
return clusterHealthEqualsSpec(h, apiObject.Spec)
}); err != nil {
t.Fatalf("Cluster not running in expected health in time: %v", err)
}
// Fetch latest status so we know all member details
apiObject, err = c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get deployment: %v", err)
}
// Pick an agent to be deleted 5 times
targetAgent := apiObject.Status.Members.Agents[0]
for i := 0; i < 5; i++ {
// Get current pod so we can compare UID later
originalPod, err := kubecli.CoreV1().Pods(ns).Get(targetAgent.PodName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod %s: %v", targetAgent.PodName, err)
}
if err := kubecli.CoreV1().Pods(ns).Delete(targetAgent.PodName, &metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete pod %s: %v", targetAgent.PodName, err)
}
if i < 4 {
// Wait for pod to return with different UID
op := func() error {
pod, err := kubecli.CoreV1().Pods(ns).Get(targetAgent.PodName, metav1.GetOptions{})
if err != nil {
return maskAny(err)
}
if pod.GetUID() == originalPod.GetUID() {
return fmt.Errorf("Still original pod")
}
return nil
}
if err := retry.Retry(op, time.Minute); err != nil {
t.Fatalf("Pod did not restart: %v", err)
}
} else {
// Wait for member to be replaced
op := func() error {
updatedObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{})
if err != nil {
return maskAny(err)
}
m, _, found := updatedObject.Status.Members.ElementByID(targetAgent.ID)
if !found {
return maskAny(fmt.Errorf("Member %s not found", targetAgent.ID))
}
if m.CreatedAt.Equal(&targetAgent.CreatedAt) {
return maskAny(fmt.Errorf("Member %s still not replaced", targetAgent.ID))
}
return nil
}
if err := retry.Retry(op, time.Minute); err != nil {
t.Fatalf("Member failure did not succeed: %v", err)
}
}
// Wait for cluster to be completely ready
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
return clusterHealthEqualsSpec(h, apiObject.Spec)
}); err != nil {
t.Fatalf("Cluster not running in expected health in time: %v", err)
}
}
// Cleanup
removeDeployment(c, depl.GetName(), ns)
}
// TestMemberResilienceCoordinators creates a cluster and removes a
// specific coordinator pod 5 times. Each time it waits for a new pod to arrive.
// After 5 times, the member should be replaced by another member.
func TestMemberResilienceCoordinators(t *testing.T) {
longOrSkip(t)
c := client.MustNewInCluster()
kubecli := mustNewKubeClient(t)
ns := getNamespace(t)
// Prepare deployment config
depl := newDeployment("test-member-res-crdn-" + uniuri.NewLen(4))
depl.Spec.Mode = api.NewMode(api.DeploymentModeCluster)
depl.Spec.SetDefaults(depl.GetName()) // this must be last
// Create deployment
apiObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Create(depl)
if err != nil {
t.Fatalf("Create deployment failed: %v", err)
}
// Wait for deployment to be ready
if _, err = waitUntilDeployment(c, depl.GetName(), ns, deploymentIsReady()); err != nil {
t.Fatalf("Deployment not running in time: %v", err)
}
// Create a database client
ctx := context.Background()
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
// Wait for cluster to be completely ready
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
return clusterHealthEqualsSpec(h, apiObject.Spec)
}); err != nil {
t.Fatalf("Cluster not running in expected health in time: %v", err)
}
// Fetch latest status so we know all member details
apiObject, err = c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get deployment: %v", err)
}
// Pick a coordinator to be deleted 5 times
targetCoordinator := apiObject.Status.Members.Coordinators[0]
for i := 0; i < 5; i++ {
// Get current pod so we can compare UID later
originalPod, err := kubecli.CoreV1().Pods(ns).Get(targetCoordinator.PodName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod %s: %v", targetCoordinator.PodName, err)
}
if err := kubecli.CoreV1().Pods(ns).Delete(targetCoordinator.PodName, &metav1.DeleteOptions{}); err != nil {
t.Fatalf("Failed to delete pod %s: %v", targetCoordinator.PodName, err)
}
if i < 4 {
// Wait for pod to return with different UID
op := func() error {
pod, err := kubecli.CoreV1().Pods(ns).Get(targetCoordinator.PodName, metav1.GetOptions{})
if err != nil {
return maskAny(err)
}
if pod.GetUID() == originalPod.GetUID() {
return fmt.Errorf("Still original pod")
}
return nil
}
if err := retry.Retry(op, time.Minute); err != nil {
t.Fatalf("Pod did not restart: %v", err)
}
} else {
// Wait for member to be replaced
op := func() error {
updatedObject, err := c.DatabaseV1alpha().ArangoDeployments(ns).Get(depl.GetName(), metav1.GetOptions{})
if err != nil {
return maskAny(err)
}
if updatedObject.Status.Members.ContainsID(targetCoordinator.ID) {
return maskAny(fmt.Errorf("Member %s still not replaced", targetCoordinator.ID))
}
return nil
}
if err := retry.Retry(op, time.Minute); err != nil {
t.Fatalf("Member failure did not succeed: %v", err)
}
}
// Wait for cluster to be completely ready
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
return clusterHealthEqualsSpec(h, apiObject.Spec)
}); err != nil {
t.Fatalf("Cluster not running in expected health in time: %v", err)
}
}
// Cleanup
removeDeployment(c, depl.GetName(), ns)
}