1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] High priority plan (#774)

This commit is contained in:
Adam Janikowski 2021-08-19 10:21:23 +02:00 committed by GitHub
parent 142299071e
commit ab9873d173
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 219 additions and 97 deletions

View file

@ -2,6 +2,7 @@
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- Update 'github.com/arangodb/arangosync-client' dependency to v0.7.0
- Add HighPriorityPlan to ArangoDeployment Status
## [1.2.1](https://github.com/arangodb/kube-arangodb/tree/1.2.1) (2021-07-28)
- Fix ArangoMember race with multiple ArangoDeployments within single namespace

View file

@ -63,6 +63,9 @@ type DeploymentStatus struct {
// Plan to update this deployment
Plan Plan `json:"plan,omitempty"`
// HighPriorityPlan to update this deployment. Executed before plan
HighPriorityPlan Plan `json:"highPriorityPlan,omitempty"`
// AcceptedSpec contains the last specification that was accepted by the operator.
AcceptedSpec *DeploymentSpec `json:"accepted-spec,omitempty"`

View file

@ -29,6 +29,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// ActionPriority define action priority
type ActionPriority int
const (
// ActionPriorityNormal define normal priority plan
ActionPriorityNormal ActionPriority = iota
// ActionPriorityHigh define high priority plan
ActionPriorityHigh
)
// ActionType is a strongly typed name for a plan action item
type ActionType string
@ -36,6 +46,14 @@ func (a ActionType) String() string {
return string(a)
}
// Priority returns plan priority
func (a ActionType) Priority() ActionPriority {
switch a {
default:
return ActionPriorityNormal
}
}
const (
// ActionTypeIdle causes a plan to be recalculated.
ActionTypeIdle ActionType = "Idle"

View file

@ -618,6 +618,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.HighPriorityPlan != nil {
in, out := &in.HighPriorityPlan, &out.HighPriorityPlan
*out = make(Plan, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.AcceptedSpec != nil {
in, out := &in.AcceptedSpec, &out.AcceptedSpec
*out = new(DeploymentSpec)

View file

@ -63,6 +63,9 @@ type DeploymentStatus struct {
// Plan to update this deployment
Plan Plan `json:"plan,omitempty"`
// HighPriorityPlan to update this deployment. Executed before plan
HighPriorityPlan Plan `json:"highPriorityPlan,omitempty"`
// AcceptedSpec contains the last specification that was accepted by the operator.
AcceptedSpec *DeploymentSpec `json:"accepted-spec,omitempty"`

View file

@ -29,6 +29,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// ActionPriority define action priority
type ActionPriority int
const (
// ActionPriorityNormal define normal priority plan
ActionPriorityNormal ActionPriority = iota
// ActionPriorityHigh define high priority plan
ActionPriorityHigh
)
// ActionType is a strongly typed name for a plan action item
type ActionType string
@ -36,6 +46,14 @@ func (a ActionType) String() string {
return string(a)
}
// Priority returns plan priority
func (a ActionType) Priority() ActionPriority {
switch a {
default:
return ActionPriorityNormal
}
}
const (
// ActionTypeIdle causes a plan to be recalculated.
ActionTypeIdle ActionType = "Idle"

View file

@ -618,6 +618,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.HighPriorityPlan != nil {
in, out := &in.HighPriorityPlan, &out.HighPriorityPlan
*out = make(Plan, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.AcceptedSpec != nil {
in, out := &in.AcceptedSpec, &out.AcceptedSpec
*out = new(DeploymentSpec)

View file

@ -38,28 +38,108 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
type planner interface {
Get(deployment *api.DeploymentStatus) api.Plan
Set(deployment *api.DeploymentStatus, p api.Plan) bool
}
var _ planner = plannerNormal{}
var _ planner = plannerHigh{}
type plannerNormal struct {
}
func (p plannerNormal) Get(deployment *api.DeploymentStatus) api.Plan {
return deployment.Plan
}
func (p plannerNormal) Set(deployment *api.DeploymentStatus, plan api.Plan) bool {
if !deployment.Plan.Equal(plan) {
deployment.Plan = plan
return true
}
return false
}
type plannerHigh struct {
}
func (p plannerHigh) Get(deployment *api.DeploymentStatus) api.Plan {
return deployment.HighPriorityPlan
}
func (p plannerHigh) Set(deployment *api.DeploymentStatus, plan api.Plan) bool {
if !deployment.HighPriorityPlan.Equal(plan) {
deployment.HighPriorityPlan = plan
return true
}
return false
}
// ExecutePlan tries to execute the plan as far as possible.
// Returns true when it has to be called again soon.
// False otherwise.
func (d *Reconciler) ExecutePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (bool, error) {
log := d.log
firstLoop := true
var callAgain bool
if again, err := d.executePlanStatus(ctx, cachedStatus, d.log, plannerHigh{}); err != nil {
return false, errors.WithStack(err)
} else if again {
callAgain = true
}
if again, err := d.executePlanStatus(ctx, cachedStatus, d.log, plannerNormal{}); err != nil {
return false, errors.WithStack(err)
} else if again {
callAgain = true
}
return callAgain, nil
}
func (d *Reconciler) executePlanStatus(ctx context.Context, cachedStatus inspectorInterface.Inspector, log zerolog.Logger, pg planner) (bool, error) {
loopStatus, _ := d.context.GetStatus()
plan := pg.Get(&loopStatus)
if len(plan) == 0 {
return false, nil
}
newPlan, callAgain, err := d.executePlan(ctx, cachedStatus, log, plan)
// Refresh current status
loopStatus, lastVersion := d.context.GetStatus()
if pg.Set(&loopStatus, newPlan) {
log.Info().Msg("Updating plan")
if err := d.context.UpdateStatus(ctx, loopStatus, lastVersion, true); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")
return false, errors.WithStack(err)
}
}
if err != nil {
return false, err
}
return callAgain, nil
}
func (d *Reconciler) executePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector, log zerolog.Logger, statusPlan api.Plan) (newPlan api.Plan, callAgain bool, err error) {
plan := statusPlan.DeepCopy()
for {
loopStatus, _ := d.context.GetStatus()
if len(loopStatus.Plan) == 0 {
// No plan exists or all action have finished, nothing to be done
if !firstLoop {
log.Debug().Msg("Reconciliation plan has finished")
}
return !firstLoop, nil
if len(plan) == 0 {
return nil, false, nil
}
firstLoop = false // nolint:ineffassign
// Take first action
planAction := loopStatus.Plan[0]
planAction := plan[0]
logContext := log.With().
Int("plan-len", len(loopStatus.Plan)).
Int("plan-len", len(plan)).
Str("action-id", planAction.ID).
Str("action-type", string(planAction.Type)).
Str("group", planAction.Group.AsRole()).
@ -72,98 +152,83 @@ func (d *Reconciler) ExecutePlan(ctx context.Context, cachedStatus inspectorInte
log := logContext.Logger()
action := d.createAction(log, planAction, cachedStatus)
if planAction.StartTime.IsZero() {
// Not started yet
ready, err := action.Start(ctx)
if err != nil {
log.Debug().Err(err).
Msg("Failed to start action")
return false, errors.WithStack(err)
}
{ // action.Start may have changed status, so reload it.
status, lastVersion := d.context.GetStatus()
// Update status according to result on action.Start.
if ready {
// Remove action from list
status.Plan = status.Plan[1:]
if len(status.Plan) > 0 && status.Plan[0].MemberID == api.MemberIDPreviousAction {
// Fill in MemberID from previous action
status.Plan[0].MemberID = action.MemberID()
}
} else {
// Mark start time
now := metav1.Now()
status.Plan[0].StartTime = &now
}
// Save plan update
if err := d.context.UpdateStatus(ctx, status, lastVersion, true); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")
return false, errors.WithStack(err)
}
}
log.Debug().Bool("ready", ready).Msg("Action Start completed")
return true, nil
done, abort, recall, err := d.executeAction(ctx, log, planAction, action)
if err != nil {
return plan, false, errors.WithStack(err)
}
if abort {
return plan, true, nil
}
if done {
if len(plan) > 1 {
plan = plan[1:]
if plan[0].MemberID == api.MemberIDPreviousAction {
plan[0].MemberID = action.MemberID()
}
} else {
plan = nil
}
} else {
// First action of plan has been started, check its progress
ready, abort, err := action.CheckProgress(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to check action progress")
return false, errors.WithStack(err)
if plan[0].StartTime.IsZero() {
now := metav1.Now()
plan[0].StartTime = &now
}
if ready {
{ // action.CheckProgress may have changed status, so reload it.
status, lastVersion := d.context.GetStatus()
// Remove action from list
status.Plan = status.Plan[1:]
if len(status.Plan) > 0 && status.Plan[0].MemberID == api.MemberIDPreviousAction {
// Fill in MemberID from previous action
status.Plan[0].MemberID = action.MemberID()
}
// Save plan update
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")
return false, errors.WithStack(err)
}
}
}
log.Debug().
Bool("abort", abort).
Bool("ready", ready).
Msg("Action CheckProgress completed")
if !ready {
deadlineExpired := false
if abort {
log.Warn().Msg("Action aborted. Removing the entire plan")
d.context.CreateEvent(k8sutil.NewPlanAbortedEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
} else {
// Not ready yet & no abort, check timeout
deadline := planAction.CreationTime.Add(action.Timeout(d.context.GetSpec()))
if time.Now().After(deadline) {
// Timeout has expired
deadlineExpired = true
log.Warn().Msg("Action not finished in time. Removing the entire plan")
d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
}
}
if abort || deadlineExpired {
// Replace plan with empty one and save it.
status, lastVersion := d.context.GetStatus()
status.Plan = api.Plan{}
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")
return false, errors.WithStack(err)
}
return true, nil
}
// Timeout not yet expired, come back soon
return true, nil
}
return true, nil
return plan, recall, nil
}
}
}
func (d *Reconciler) executeAction(ctx context.Context, log zerolog.Logger, planAction api.Action, action Action) (done, abort, callAgain bool, err error) {
if planAction.StartTime.IsZero() {
// Not started yet
ready, err := action.Start(ctx)
if err != nil {
log.Debug().Err(err).
Msg("Failed to start action")
return false, false, false, errors.WithStack(err)
}
if ready {
log.Debug().Bool("ready", ready).Msg("Action Start completed")
return true, false, false, nil
}
return false, false, true, nil
}
// First action of plan has been started, check its progress
ready, abort, err := action.CheckProgress(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to check action progress")
return false, false, false, errors.WithStack(err)
}
log.Debug().
Bool("abort", abort).
Bool("ready", ready).
Msg("Action CheckProgress completed")
if ready {
return true, false, false, nil
}
if abort {
log.Warn().Msg("Action aborted. Removing the entire plan")
d.context.CreateEvent(k8sutil.NewPlanAbortedEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
return false, true, false, nil
} else if time.Now().After(planAction.CreationTime.Add(action.Timeout(d.context.GetSpec()))) {
log.Warn().Msg("Action not finished in time. Removing the entire plan")
d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
return false, true, false, nil
}
// Timeout not yet expired, come back soon
return false, false, true, nil
}
// createAction create action object based on action type
func (d *Reconciler) createAction(log zerolog.Logger, action api.Action, cachedStatus inspectorInterface.Inspector) Action {
actionCtx := newActionContext(log.With().Str("id", action.ID).Str("type", action.Type.String()).Logger(), d.context, cachedStatus)