1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Disaster recovery (#590)

This commit is contained in:
Adam Janikowski 2020-07-01 14:54:22 +02:00 committed by GitHub
parent f74977e279
commit 9159f70c43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 309 additions and 8 deletions

View file

@ -10,6 +10,8 @@
- Add Label and Envs Pod customization - Add Label and Envs Pod customization
- Improved JWT Rotation - Improved JWT Rotation
- Allow to customize Security Context in pods - Allow to customize Security Context in pods
- Remove dead Coordinators in Cluster mode
- Add AutoRecovery flag to recover cluster in case of deadlock
## [1.0.3](https://github.com/arangodb/kube-arangodb/tree/1.0.3) (2020-05-25) ## [1.0.3](https://github.com/arangodb/kube-arangodb/tree/1.0.3) (2020-05-25)
- Prevent deletion of not known PVC's - Prevent deletion of not known PVC's

View file

@ -94,6 +94,8 @@ type DeploymentSpec struct {
Chaos ChaosSpec `json:"chaos"` Chaos ChaosSpec `json:"chaos"`
Recovery *ArangoDeploymentRecoverySpec `json:"recovery,omitempty"`
Bootstrap BootstrapSpec `json:"bootstrap,omitempty"` Bootstrap BootstrapSpec `json:"bootstrap,omitempty"`
} }

View file

@ -113,6 +113,8 @@ const (
ActionTypeJWTRefresh ActionType = "JWTRefresh" ActionTypeJWTRefresh ActionType = "JWTRefresh"
// ActionTypeJWTPropagated change propagated flag // ActionTypeJWTPropagated change propagated flag
ActionTypeJWTPropagated ActionType = "JWTPropagated" ActionTypeJWTPropagated ActionType = "JWTPropagated"
// ActionTypeClusterMemberCleanup removes member from cluster
ActionTypeClusterMemberCleanup ActionType = "ClusterMemberCleanup"
) )
const ( const (

View file

@ -0,0 +1,41 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package v1
import "github.com/arangodb/kube-arangodb/pkg/util"
type ArangoDeploymentRecoverySpec struct {
AutoRecover *bool `json:"autoRecover"`
}
func (a *ArangoDeploymentRecoverySpec) Get() ArangoDeploymentRecoverySpec {
if a != nil {
return *a
}
return ArangoDeploymentRecoverySpec{}
}
func (a ArangoDeploymentRecoverySpec) GetAutoRecover() bool {
return util.BoolOrDefault(a.AutoRecover, false)
}

View file

@ -103,7 +103,7 @@ type ServerGroupSpecSecurityContext struct {
AllowPrivilegeEscalation *bool `json:"allowPrivilegeEscalation,omitempty"` AllowPrivilegeEscalation *bool `json:"allowPrivilegeEscalation,omitempty"`
Privileged *bool `json:"privileged,omitempty"` Privileged *bool `json:"privileged,omitempty"`
ReadOnlyRootFilesystem *bool `json:"readOnlyFileSystem,omitempty"` ReadOnlyRootFilesystem *bool `json:"readOnlyRootFilesystem,omitempty"`
RunAsNonRoot *bool `json:"runAsNonRoot,omitempty"` RunAsNonRoot *bool `json:"runAsNonRoot,omitempty"`
RunAsUser *int64 `json:"runAsUser,omitempty"` RunAsUser *int64 `json:"runAsUser,omitempty"`
RunAsGroup *int64 `json:"runAsGroup,omitempty"` RunAsGroup *int64 `json:"runAsGroup,omitempty"`

View file

@ -122,6 +122,27 @@ func (in *ArangoDeploymentList) DeepCopyObject() runtime.Object {
return nil return nil
} }
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoDeploymentRecoverySpec) DeepCopyInto(out *ArangoDeploymentRecoverySpec) {
*out = *in
if in.AutoRecover != nil {
in, out := &in.AutoRecover, &out.AutoRecover
*out = new(bool)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoDeploymentRecoverySpec.
func (in *ArangoDeploymentRecoverySpec) DeepCopy() *ArangoDeploymentRecoverySpec {
if in == nil {
return nil
}
out := new(ArangoDeploymentRecoverySpec)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *AuthenticationSpec) DeepCopyInto(out *AuthenticationSpec) { func (in *AuthenticationSpec) DeepCopyInto(out *AuthenticationSpec) {
*out = *in *out = *in
@ -355,6 +376,11 @@ func (in *DeploymentSpec) DeepCopyInto(out *DeploymentSpec) {
in.SyncMasters.DeepCopyInto(&out.SyncMasters) in.SyncMasters.DeepCopyInto(&out.SyncMasters)
in.SyncWorkers.DeepCopyInto(&out.SyncWorkers) in.SyncWorkers.DeepCopyInto(&out.SyncWorkers)
in.Chaos.DeepCopyInto(&out.Chaos) in.Chaos.DeepCopyInto(&out.Chaos)
if in.Recovery != nil {
in, out := &in.Recovery, &out.Recovery
*out = new(ArangoDeploymentRecoverySpec)
(*in).DeepCopyInto(*out)
}
in.Bootstrap.DeepCopyInto(&out.Bootstrap) in.Bootstrap.DeepCopyInto(&out.Bootstrap)
return return
} }

View file

@ -0,0 +1,95 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
"github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/rs/zerolog"
)
func init() {
registerAction(api.ActionTypeClusterMemberCleanup, newClusterMemberCleanupAction)
}
// newClusterMemberCleanupAction creates a new Action that implements the given
// planned ClusterMemberCleanup action.
func newClusterMemberCleanupAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &actionClusterMemberCleanup{}
a.actionImpl = newActionImplDefRef(log, action, actionCtx, addMemberTimeout)
return a
}
// actionClusterMemberCleanup implements an ClusterMemberCleanup.
type actionClusterMemberCleanup struct {
// actionImpl implement timeout and member id functions
actionImpl
// actionEmptyCheckProgress implement check progress with empty implementation
actionEmptyCheckProgress
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionClusterMemberCleanup) Start(ctx context.Context) (bool, error) {
if err := a.start(ctx); err != nil {
a.log.Warn().Err(err).Msgf("Unable to clean cluster member")
}
return true, nil
}
func (a *actionClusterMemberCleanup) start(ctx context.Context) error {
id := driver.ServerID(a.MemberID())
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
return err
}
cluster, err := c.Cluster(ctx)
if err != nil {
return err
}
health, err := cluster.Health(ctx)
if err != nil {
return err
}
if _, ok := health.Health[id]; !ok {
return nil
}
if err := cluster.RemoveServer(ctx, id); err != nil {
return err
}
return nil
}

View file

@ -245,11 +245,6 @@ func createPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIOb
plan = pb.Apply(createKeyfileRenewalPlan) plan = pb.Apply(createKeyfileRenewalPlan)
} }
// Check for the need to rotate TLS certificate of a members
//if plan.IsEmpty() {
// plan = pb.Apply(createRotateTLSServerCertificatePlan)
//}
// Check for changes storage classes or requirements // Check for changes storage classes or requirements
if plan.IsEmpty() { if plan.IsEmpty() {
plan = pb.Apply(createRotateServerStoragePlan) plan = pb.Apply(createRotateServerStoragePlan)
@ -271,6 +266,10 @@ func createPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIOb
plan = pb.Apply(createCACleanPlan) plan = pb.Apply(createCACleanPlan)
} }
if plan.IsEmpty() {
plan = pb.Apply(createClusterOperationPlan)
}
// Return plan // Return plan
return plan, true return plan, true
} }

View file

@ -0,0 +1,95 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
"time"
"github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog"
)
const coordinatorHealthFailedTimeout time.Duration = time.Minute
func createClusterOperationPlan(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspector.Inspector, context PlanBuilderContext) api.Plan {
if spec.GetMode() != api.DeploymentModeCluster {
return nil
}
c, err := context.GetDatabaseClient(ctx)
if err != nil {
return nil
}
cluster, err := c.Cluster(ctx)
if err != nil {
log.Warn().Err(err).Msgf("Unable to get Cluster client")
return nil
}
health, err := cluster.Health(ctx)
if err != nil {
log.Warn().Err(err).Msgf("Unable to get Cluster health")
return nil
}
membersHealth := health.Health
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
delete(membersHealth, driver.ServerID(m.ID))
}
return nil
})
if len(membersHealth) == 0 {
return nil
}
for id, member := range membersHealth {
switch member.Role {
case driver.ServerRoleCoordinator:
if member.Status != driver.ServerStatusFailed {
continue
}
if member.LastHeartbeatAcked.Add(coordinatorHealthFailedTimeout).Before(time.Now()) {
return api.Plan{
api.NewAction(api.ActionTypeClusterMemberCleanup, api.ServerGroupCoordinators, string(id)),
}
}
}
}
return nil
}

View file

@ -58,6 +58,9 @@ type PlanBuilderContext interface {
RenderPodForMember(cachedStatus inspector.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) RenderPodForMember(cachedStatus inspector.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error)
// SelectImage select currently used image by pod // SelectImage select currently used image by pod
SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool) SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool)
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
GetDatabaseClient(ctx context.Context) (driver.Client, error)
// GetServerClient returns a cached client for a specific server. // GetServerClient returns a cached client for a specific server.
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// SecretsInterface return secret interface // SecretsInterface return secret interface

View file

@ -28,6 +28,8 @@ import (
"io/ioutil" "io/ioutil"
"testing" "testing"
"github.com/pkg/errors"
policy "k8s.io/api/policy/v1beta1" policy "k8s.io/api/policy/v1beta1"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
@ -117,7 +119,7 @@ func (c *testContext) UpdateMember(member api.MemberStatus) error {
} }
func (c *testContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) { func (c *testContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
panic("implement me") return nil, errors.Errorf("Client Not Found")
} }
func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) { func (c *testContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {

View file

@ -124,12 +124,28 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, log zerol
log.Debug().Msg("Pod is already failed, safe to remove dbserver pod") log.Debug().Msg("Pod is already failed, safe to remove dbserver pod")
return nil return nil
} }
// If pod is not member of cluster, do nothing // If pod is not member of cluster, do nothing
if !memberStatus.Conditions.IsTrue(api.ConditionTypeMemberOfCluster) { if !memberStatus.Conditions.IsTrue(api.ConditionTypeMemberOfCluster) {
log.Debug().Msg("Pod is not member of cluster") log.Debug().Msg("Pod is not member of cluster")
return nil return nil
} }
if c, ok := k8sutil.GetContainerStatusByName(p, k8sutil.ServerContainerName); ok {
if t := c.State.Terminated; t != nil {
log.Warn().Str("member", memberStatus.ID).
Str("pod", p.GetName()).
Str("uid", string(p.GetUID())).
Int32("exit-code", t.ExitCode).
Str("reason", t.Reason).
Str("message", t.Message).
Int32("signal", t.Signal).
Time("started", t.StartedAt.Time).
Time("finished", t.FinishedAt.Time).
Msgf("Pod failed in unexpected way")
}
}
// Inspect deployment deletion state // Inspect deployment deletion state
apiObject := r.context.GetAPIObject() apiObject := r.context.GetAPIObject()
if apiObject.GetDeletionTimestamp() != nil { if apiObject.GetDeletionTimestamp() != nil {
@ -194,7 +210,14 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, log zerol
cluster, err := c.Cluster(ctx) cluster, err := c.Cluster(ctx)
if err != nil { if err != nil {
log.Debug().Err(err).Msg("Failed to access cluster") log.Debug().Err(err).Msg("Failed to access cluster")
return maskAny(err)
if r.context.GetSpec().Recovery.Get().GetAutoRecover() {
if c, ok := k8sutil.GetContainerStatusByName(p, k8sutil.ServerContainerName); ok {
if t := c.State.Terminated; t != nil {
return nil
}
}
}
} }
cleanedOut, err := cluster.IsCleanedOut(ctx, memberStatus.ID) cleanedOut, err := cluster.IsCleanedOut(ctx, memberStatus.ID)
if err != nil { if err != nil {

View file

@ -35,6 +35,17 @@ func GetContainerByName(p *v1.Pod, name string) (v1.Container, bool) {
return v1.Container{}, false return v1.Container{}, false
} }
// GetContainerStatusByName returns the container status in the given pod with the given name.
// Returns false if not found.
func GetContainerStatusByName(p *v1.Pod, name string) (v1.ContainerStatus, bool) {
for _, c := range p.Status.ContainerStatuses {
if c.Name == name {
return c, true
}
}
return v1.ContainerStatus{}, false
}
// IsResourceRequirementsChanged returns true if the resource requirements have changed. // IsResourceRequirementsChanged returns true if the resource requirements have changed.
func IsResourceRequirementsChanged(wanted, given v1.ResourceRequirements) bool { func IsResourceRequirementsChanged(wanted, given v1.ResourceRequirements) bool {
checkList := func(wanted, given v1.ResourceList) bool { checkList := func(wanted, given v1.ResourceList) bool {