From ab9873d173b06141d9d67ab9a8720e00b90d64b4 Mon Sep 17 00:00:00 2001 From: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> Date: Thu, 19 Aug 2021 10:21:23 +0200 Subject: [PATCH] [Feature] High priority plan (#774) --- CHANGELOG.md | 1 + pkg/apis/deployment/v1/deployment_status.go | 3 + pkg/apis/deployment/v1/plan.go | 18 ++ .../deployment/v1/zz_generated.deepcopy.go | 7 + .../deployment/v2alpha1/deployment_status.go | 3 + pkg/apis/deployment/v2alpha1/plan.go | 18 ++ .../v2alpha1/zz_generated.deepcopy.go | 7 + pkg/deployment/reconcile/plan_executor.go | 259 +++++++++++------- 8 files changed, 219 insertions(+), 97 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 35666f9e9..5cf848357 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ ## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A) - Update 'github.com/arangodb/arangosync-client' dependency to v0.7.0 +- Add HighPriorityPlan to ArangoDeployment Status ## [1.2.1](https://github.com/arangodb/kube-arangodb/tree/1.2.1) (2021-07-28) - Fix ArangoMember race with multiple ArangoDeployments within single namespace diff --git a/pkg/apis/deployment/v1/deployment_status.go b/pkg/apis/deployment/v1/deployment_status.go index f734b6f70..8fe7f125f 100644 --- a/pkg/apis/deployment/v1/deployment_status.go +++ b/pkg/apis/deployment/v1/deployment_status.go @@ -63,6 +63,9 @@ type DeploymentStatus struct { // Plan to update this deployment Plan Plan `json:"plan,omitempty"` + // HighPriorityPlan to update this deployment. Executed before plan + HighPriorityPlan Plan `json:"highPriorityPlan,omitempty"` + // AcceptedSpec contains the last specification that was accepted by the operator. AcceptedSpec *DeploymentSpec `json:"accepted-spec,omitempty"` diff --git a/pkg/apis/deployment/v1/plan.go b/pkg/apis/deployment/v1/plan.go index 0f603554f..0419f67d8 100644 --- a/pkg/apis/deployment/v1/plan.go +++ b/pkg/apis/deployment/v1/plan.go @@ -29,6 +29,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// ActionPriority define action priority +type ActionPriority int + +const ( + // ActionPriorityNormal define normal priority plan + ActionPriorityNormal ActionPriority = iota + // ActionPriorityHigh define high priority plan + ActionPriorityHigh +) + // ActionType is a strongly typed name for a plan action item type ActionType string @@ -36,6 +46,14 @@ func (a ActionType) String() string { return string(a) } +// Priority returns plan priority +func (a ActionType) Priority() ActionPriority { + switch a { + default: + return ActionPriorityNormal + } +} + const ( // ActionTypeIdle causes a plan to be recalculated. ActionTypeIdle ActionType = "Idle" diff --git a/pkg/apis/deployment/v1/zz_generated.deepcopy.go b/pkg/apis/deployment/v1/zz_generated.deepcopy.go index 2cbce9277..9cc6905f4 100644 --- a/pkg/apis/deployment/v1/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v1/zz_generated.deepcopy.go @@ -618,6 +618,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.HighPriorityPlan != nil { + in, out := &in.HighPriorityPlan, &out.HighPriorityPlan + *out = make(Plan, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.AcceptedSpec != nil { in, out := &in.AcceptedSpec, &out.AcceptedSpec *out = new(DeploymentSpec) diff --git a/pkg/apis/deployment/v2alpha1/deployment_status.go b/pkg/apis/deployment/v2alpha1/deployment_status.go index 2c006ab47..66ad15a8a 100644 --- a/pkg/apis/deployment/v2alpha1/deployment_status.go +++ b/pkg/apis/deployment/v2alpha1/deployment_status.go @@ -63,6 +63,9 @@ type DeploymentStatus struct { // Plan to update this deployment Plan Plan `json:"plan,omitempty"` + // HighPriorityPlan to update this deployment. Executed before plan + HighPriorityPlan Plan `json:"highPriorityPlan,omitempty"` + // AcceptedSpec contains the last specification that was accepted by the operator. AcceptedSpec *DeploymentSpec `json:"accepted-spec,omitempty"` diff --git a/pkg/apis/deployment/v2alpha1/plan.go b/pkg/apis/deployment/v2alpha1/plan.go index a794d6612..ee3957541 100644 --- a/pkg/apis/deployment/v2alpha1/plan.go +++ b/pkg/apis/deployment/v2alpha1/plan.go @@ -29,6 +29,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// ActionPriority define action priority +type ActionPriority int + +const ( + // ActionPriorityNormal define normal priority plan + ActionPriorityNormal ActionPriority = iota + // ActionPriorityHigh define high priority plan + ActionPriorityHigh +) + // ActionType is a strongly typed name for a plan action item type ActionType string @@ -36,6 +46,14 @@ func (a ActionType) String() string { return string(a) } +// Priority returns plan priority +func (a ActionType) Priority() ActionPriority { + switch a { + default: + return ActionPriorityNormal + } +} + const ( // ActionTypeIdle causes a plan to be recalculated. ActionTypeIdle ActionType = "Idle" diff --git a/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go index c3db99ac9..7bbd33ec1 100644 --- a/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v2alpha1/zz_generated.deepcopy.go @@ -618,6 +618,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.HighPriorityPlan != nil { + in, out := &in.HighPriorityPlan, &out.HighPriorityPlan + *out = make(Plan, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } if in.AcceptedSpec != nil { in, out := &in.AcceptedSpec, &out.AcceptedSpec *out = new(DeploymentSpec) diff --git a/pkg/deployment/reconcile/plan_executor.go b/pkg/deployment/reconcile/plan_executor.go index 2fb545746..481521c2b 100644 --- a/pkg/deployment/reconcile/plan_executor.go +++ b/pkg/deployment/reconcile/plan_executor.go @@ -38,28 +38,108 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) +type planner interface { + Get(deployment *api.DeploymentStatus) api.Plan + Set(deployment *api.DeploymentStatus, p api.Plan) bool +} + +var _ planner = plannerNormal{} +var _ planner = plannerHigh{} + +type plannerNormal struct { +} + +func (p plannerNormal) Get(deployment *api.DeploymentStatus) api.Plan { + return deployment.Plan +} + +func (p plannerNormal) Set(deployment *api.DeploymentStatus, plan api.Plan) bool { + if !deployment.Plan.Equal(plan) { + deployment.Plan = plan + return true + } + + return false +} + +type plannerHigh struct { +} + +func (p plannerHigh) Get(deployment *api.DeploymentStatus) api.Plan { + return deployment.HighPriorityPlan +} + +func (p plannerHigh) Set(deployment *api.DeploymentStatus, plan api.Plan) bool { + if !deployment.HighPriorityPlan.Equal(plan) { + deployment.HighPriorityPlan = plan + return true + } + + return false +} + // ExecutePlan tries to execute the plan as far as possible. // Returns true when it has to be called again soon. // False otherwise. func (d *Reconciler) ExecutePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (bool, error) { - log := d.log - firstLoop := true + var callAgain bool + + if again, err := d.executePlanStatus(ctx, cachedStatus, d.log, plannerHigh{}); err != nil { + return false, errors.WithStack(err) + } else if again { + callAgain = true + } + + if again, err := d.executePlanStatus(ctx, cachedStatus, d.log, plannerNormal{}); err != nil { + return false, errors.WithStack(err) + } else if again { + callAgain = true + } + + return callAgain, nil +} + +func (d *Reconciler) executePlanStatus(ctx context.Context, cachedStatus inspectorInterface.Inspector, log zerolog.Logger, pg planner) (bool, error) { + loopStatus, _ := d.context.GetStatus() + + plan := pg.Get(&loopStatus) + + if len(plan) == 0 { + return false, nil + } + + newPlan, callAgain, err := d.executePlan(ctx, cachedStatus, log, plan) + + // Refresh current status + loopStatus, lastVersion := d.context.GetStatus() + + if pg.Set(&loopStatus, newPlan) { + log.Info().Msg("Updating plan") + if err := d.context.UpdateStatus(ctx, loopStatus, lastVersion, true); err != nil { + log.Debug().Err(err).Msg("Failed to update CR status") + return false, errors.WithStack(err) + } + } + + if err != nil { + return false, err + } + + return callAgain, nil +} + +func (d *Reconciler) executePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector, log zerolog.Logger, statusPlan api.Plan) (newPlan api.Plan, callAgain bool, err error) { + plan := statusPlan.DeepCopy() for { - loopStatus, _ := d.context.GetStatus() - if len(loopStatus.Plan) == 0 { - // No plan exists or all action have finished, nothing to be done - if !firstLoop { - log.Debug().Msg("Reconciliation plan has finished") - } - return !firstLoop, nil + if len(plan) == 0 { + return nil, false, nil } - firstLoop = false // nolint:ineffassign // Take first action - planAction := loopStatus.Plan[0] + planAction := plan[0] logContext := log.With(). - Int("plan-len", len(loopStatus.Plan)). + Int("plan-len", len(plan)). Str("action-id", planAction.ID). Str("action-type", string(planAction.Type)). Str("group", planAction.Group.AsRole()). @@ -72,98 +152,83 @@ func (d *Reconciler) ExecutePlan(ctx context.Context, cachedStatus inspectorInte log := logContext.Logger() action := d.createAction(log, planAction, cachedStatus) - if planAction.StartTime.IsZero() { - // Not started yet - ready, err := action.Start(ctx) - if err != nil { - log.Debug().Err(err). - Msg("Failed to start action") - return false, errors.WithStack(err) - } - { // action.Start may have changed status, so reload it. - status, lastVersion := d.context.GetStatus() - // Update status according to result on action.Start. - if ready { - // Remove action from list - status.Plan = status.Plan[1:] - if len(status.Plan) > 0 && status.Plan[0].MemberID == api.MemberIDPreviousAction { - // Fill in MemberID from previous action - status.Plan[0].MemberID = action.MemberID() - } - } else { - // Mark start time - now := metav1.Now() - status.Plan[0].StartTime = &now - } - // Save plan update - if err := d.context.UpdateStatus(ctx, status, lastVersion, true); err != nil { - log.Debug().Err(err).Msg("Failed to update CR status") - return false, errors.WithStack(err) - } - } - log.Debug().Bool("ready", ready).Msg("Action Start completed") - return true, nil + done, abort, recall, err := d.executeAction(ctx, log, planAction, action) + if err != nil { + return plan, false, errors.WithStack(err) + } + + if abort { + return plan, true, nil + } + + if done { + if len(plan) > 1 { + plan = plan[1:] + if plan[0].MemberID == api.MemberIDPreviousAction { + plan[0].MemberID = action.MemberID() + } + } else { + plan = nil + } } else { - // First action of plan has been started, check its progress - ready, abort, err := action.CheckProgress(ctx) - if err != nil { - log.Debug().Err(err).Msg("Failed to check action progress") - return false, errors.WithStack(err) + if plan[0].StartTime.IsZero() { + now := metav1.Now() + plan[0].StartTime = &now } - if ready { - { // action.CheckProgress may have changed status, so reload it. - status, lastVersion := d.context.GetStatus() - // Remove action from list - status.Plan = status.Plan[1:] - if len(status.Plan) > 0 && status.Plan[0].MemberID == api.MemberIDPreviousAction { - // Fill in MemberID from previous action - status.Plan[0].MemberID = action.MemberID() - } - // Save plan update - if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil { - log.Debug().Err(err).Msg("Failed to update CR status") - return false, errors.WithStack(err) - } - } - } - log.Debug(). - Bool("abort", abort). - Bool("ready", ready). - Msg("Action CheckProgress completed") - if !ready { - deadlineExpired := false - if abort { - log.Warn().Msg("Action aborted. Removing the entire plan") - d.context.CreateEvent(k8sutil.NewPlanAbortedEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) - } else { - // Not ready yet & no abort, check timeout - deadline := planAction.CreationTime.Add(action.Timeout(d.context.GetSpec())) - if time.Now().After(deadline) { - // Timeout has expired - deadlineExpired = true - log.Warn().Msg("Action not finished in time. Removing the entire plan") - d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) - } - } - if abort || deadlineExpired { - // Replace plan with empty one and save it. - status, lastVersion := d.context.GetStatus() - status.Plan = api.Plan{} - if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil { - log.Debug().Err(err).Msg("Failed to update CR status") - return false, errors.WithStack(err) - } - return true, nil - } - // Timeout not yet expired, come back soon - return true, nil - } - return true, nil + + return plan, recall, nil } } } +func (d *Reconciler) executeAction(ctx context.Context, log zerolog.Logger, planAction api.Action, action Action) (done, abort, callAgain bool, err error) { + if planAction.StartTime.IsZero() { + // Not started yet + ready, err := action.Start(ctx) + if err != nil { + log.Debug().Err(err). + Msg("Failed to start action") + return false, false, false, errors.WithStack(err) + } + + if ready { + log.Debug().Bool("ready", ready).Msg("Action Start completed") + return true, false, false, nil + } + + return false, false, true, nil + } + // First action of plan has been started, check its progress + ready, abort, err := action.CheckProgress(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to check action progress") + return false, false, false, errors.WithStack(err) + } + + log.Debug(). + Bool("abort", abort). + Bool("ready", ready). + Msg("Action CheckProgress completed") + + if ready { + return true, false, false, nil + } + + if abort { + log.Warn().Msg("Action aborted. Removing the entire plan") + d.context.CreateEvent(k8sutil.NewPlanAbortedEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) + return false, true, false, nil + } else if time.Now().After(planAction.CreationTime.Add(action.Timeout(d.context.GetSpec()))) { + log.Warn().Msg("Action not finished in time. Removing the entire plan") + d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole())) + return false, true, false, nil + } + + // Timeout not yet expired, come back soon + return false, false, true, nil +} + // createAction create action object based on action type func (d *Reconciler) createAction(log zerolog.Logger, action api.Action, cachedStatus inspectorInterface.Inspector) Action { actionCtx := newActionContext(log.With().Str("id", action.ID).Str("type", action.Type.String()).Logger(), d.context, cachedStatus)