mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Bugfix] Propagate spec before pod restart (#782)
This commit is contained in:
parent
f77a5d6425
commit
ff2eb2716f
11 changed files with 108 additions and 83 deletions
|
@ -53,6 +53,38 @@ type Action interface {
|
|||
MemberID() string
|
||||
}
|
||||
|
||||
// ActionReloadCachedStatus keeps information about CachedStatus reloading (executed after action has been executed)
|
||||
type ActionReloadCachedStatus interface {
|
||||
Action
|
||||
|
||||
// ReloadCachedStatus keeps information about CachedStatus reloading (executed after action has been executed)
|
||||
ReloadCachedStatus() bool
|
||||
}
|
||||
|
||||
func getActionReloadCachedStatus(a Action) bool {
|
||||
if c, ok := a.(ActionReloadCachedStatus); !ok {
|
||||
return false
|
||||
} else {
|
||||
return c.ReloadCachedStatus()
|
||||
}
|
||||
}
|
||||
|
||||
// ActionPlanAppender modify plan after action execution
|
||||
type ActionPlanAppender interface {
|
||||
Action
|
||||
|
||||
// ActionPlanAppender modify plan after action execution
|
||||
ActionPlanAppender(current api.Plan) api.Plan
|
||||
}
|
||||
|
||||
func getActionPlanAppender(a Action, plan api.Plan) api.Plan {
|
||||
if c, ok := a.(ActionPlanAppender); !ok {
|
||||
return plan
|
||||
} else {
|
||||
return c.ActionPlanAppender(plan)
|
||||
}
|
||||
}
|
||||
|
||||
type actionFactory func(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action
|
||||
|
||||
var (
|
||||
|
|
|
@ -50,6 +50,8 @@ func newAddMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionC
|
|||
return a
|
||||
}
|
||||
|
||||
var _ ActionPlanAppender = &actionAddMember{}
|
||||
|
||||
// actionAddMember implements an AddMemberAction.
|
||||
type actionAddMember struct {
|
||||
// actionImpl implement timeout and member id functions
|
||||
|
@ -72,19 +74,19 @@ func (a *actionAddMember) Start(ctx context.Context) (bool, error) {
|
|||
}
|
||||
a.newMemberID = newID
|
||||
|
||||
if _, ok := a.action.Params[api.ActionTypeWaitForMemberUp.String()]; ok {
|
||||
a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
|
||||
s.Plan = append(s.Plan, api.NewAction(api.ActionTypeWaitForMemberInSync, a.action.Group, newID, "Wait for member in sync after creation"))
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
if _, ok := a.action.Params[api.ActionTypeWaitForMemberInSync.String()]; ok {
|
||||
a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
|
||||
s.Plan = append(s.Plan, api.NewAction(api.ActionTypeWaitForMemberInSync, a.action.Group, newID, "Wait for member in sync after creation"))
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// ActionPlanAppender appends wait methods to the plan
|
||||
func (a *actionAddMember) ActionPlanAppender(current api.Plan) api.Plan {
|
||||
var app api.Plan
|
||||
|
||||
if _, ok := a.action.Params[api.ActionTypeWaitForMemberUp.String()]; ok {
|
||||
app = append(app, api.NewAction(api.ActionTypeWaitForMemberUp, a.action.Group, a.newMemberID, "Wait for member in sync after creation"))
|
||||
}
|
||||
|
||||
if _, ok := a.action.Params[api.ActionTypeWaitForMemberUp.String()]; ok {
|
||||
app = append(app, api.NewAction(api.ActionTypeWaitForMemberInSync, a.action.Group, a.newMemberID, "Wait for member in sync after creation"))
|
||||
}
|
||||
return append(app, current...)
|
||||
}
|
||||
|
|
|
@ -49,6 +49,8 @@ func newArangoMemberUpdatePodSpecAction(log zerolog.Logger, action api.Action, a
|
|||
return a
|
||||
}
|
||||
|
||||
var _ ActionReloadCachedStatus = &actionArangoMemberUpdatePodSpec{}
|
||||
|
||||
// actionArangoMemberUpdatePodSpec implements an ArangoMemberUpdatePodSpec.
|
||||
type actionArangoMemberUpdatePodSpec struct {
|
||||
// actionImpl implement timeout and member id functions
|
||||
|
@ -150,3 +152,7 @@ func (a *actionArangoMemberUpdatePodSpec) Start(ctx context.Context) (bool, erro
|
|||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (a *actionArangoMemberUpdatePodSpec) ReloadCachedStatus() bool {
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -96,20 +96,6 @@ func (a *actionPVCResize) Start(ctx context.Context) (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
|
||||
return false, nil
|
||||
} else if cmp > 0 {
|
||||
if groupSpec.GetVolumeAllowShrink() && group == api.ServerGroupDBServers {
|
||||
if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
|
||||
s.Plan = append(s.Plan, api.NewAction(api.ActionTypeMarkToRemoveMember, group, m.ID))
|
||||
return true
|
||||
}); err != nil {
|
||||
log.Error().Err(err).Msg("Unable to mark instance to be replaced")
|
||||
}
|
||||
} else {
|
||||
log.Error().Str("server-group", group.AsRole()).Str("pvc-storage-size", volumeSize.String()).Str("requested-size", requestedSize.String()).
|
||||
Msg("Volume size should not shrink")
|
||||
a.actionCtx.CreateEvent(k8sutil.NewCannotShrinkVolumeEvent(a.actionCtx.GetAPIObject(), pvc.Name))
|
||||
}
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -62,6 +62,7 @@ func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) {
|
|||
}
|
||||
// Set AutoUpgrade condition
|
||||
m.Conditions.Update(api.ConditionTypeAutoUpgrade, true, "Upgrading", "AutoUpgrade on first restart")
|
||||
|
||||
if err := a.actionCtx.UpdateMember(ctx, m); err != nil {
|
||||
return false, errors.WithStack(err)
|
||||
}
|
||||
|
@ -134,6 +135,7 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, bool, er
|
|||
} else if !ok {
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
// Pod is now upgraded, update the member status
|
||||
m.Phase = api.MemberPhaseCreated
|
||||
m.RecentTerminations = nil // Since we're upgrading, we do not care about old terminations.
|
||||
|
|
|
@ -126,7 +126,9 @@ func updateMemberPhasePlan(ctx context.Context,
|
|||
api.NewAction(api.ActionTypeMemberRIDUpdate, group, m.ID, "Regenerate member RID"),
|
||||
}
|
||||
|
||||
p = append(p, api.NewAction(api.ActionTypeArangoMemberUpdatePodStatus, group, m.ID, "Propagating status of pod"))
|
||||
p = append(p,
|
||||
api.NewAction(api.ActionTypeArangoMemberUpdatePodSpec, group, m.ID, "Propagating spec of pod"),
|
||||
api.NewAction(api.ActionTypeArangoMemberUpdatePodStatus, group, m.ID, "Propagating status of pod"))
|
||||
|
||||
p = append(p, api.NewAction(api.ActionTypeMemberPhaseUpdate, group, m.ID,
|
||||
"Move to Pending phase").AddParam(ActionTypeMemberPhaseUpdatePhaseKey, api.MemberPhasePending.String()))
|
||||
|
|
|
@ -200,6 +200,11 @@ func createRemoveCleanedDBServersPlan(ctx context.Context,
|
|||
spec api.DeploymentSpec, status api.DeploymentStatus,
|
||||
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
|
||||
for _, m := range status.Members.DBServers {
|
||||
if !m.Phase.IsReady() {
|
||||
// Ensure that we CleanOut members which are Ready only to ensure data will be moved
|
||||
continue
|
||||
}
|
||||
|
||||
if m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
|
||||
log.Debug().
|
||||
Str("id", m.ID).
|
||||
|
|
|
@ -45,9 +45,10 @@ func createRotateServerStoragePlan(ctx context.Context,
|
|||
return nil
|
||||
}
|
||||
var plan api.Plan
|
||||
var canContinue = true
|
||||
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
|
||||
for _, m := range members {
|
||||
if !plan.IsEmpty() {
|
||||
if !plan.IsEmpty() && !canContinue {
|
||||
// Only 1 change at a time
|
||||
continue
|
||||
}
|
||||
|
@ -73,6 +74,9 @@ func createRotateServerStoragePlan(ctx context.Context,
|
|||
}
|
||||
|
||||
if util.StringOrDefault(pvc.Spec.StorageClassName) != storageClassName && storageClassName != "" {
|
||||
// Do not append more than 1 operation if we replace storageClass
|
||||
canContinue = false
|
||||
|
||||
// Storageclass has changed
|
||||
log.Info().Str("pod-name", m.PodName).
|
||||
Str("pvc-storage-class", util.StringOrDefault(pvc.Spec.StorageClassName)).
|
||||
|
@ -80,14 +84,7 @@ func createRotateServerStoragePlan(ctx context.Context,
|
|||
|
||||
if group == api.ServerGroupDBServers {
|
||||
plan = append(plan,
|
||||
api.NewAction(api.ActionTypeDisableClusterScaling, group, ""),
|
||||
api.NewAction(api.ActionTypeAddMember, group, ""),
|
||||
api.NewAction(api.ActionTypeWaitForMemberUp, group, api.MemberIDPreviousAction),
|
||||
api.NewAction(api.ActionTypeCleanOutMember, group, m.ID),
|
||||
api.NewAction(api.ActionTypeShutdownMember, group, m.ID),
|
||||
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
|
||||
api.NewAction(api.ActionTypeEnableClusterScaling, group, ""),
|
||||
)
|
||||
api.NewAction(api.ActionTypeMarkToRemoveMember, group, m.ID))
|
||||
} else if group == api.ServerGroupAgents {
|
||||
plan = append(plan,
|
||||
api.NewAction(api.ActionTypeShutdownMember, group, m.ID),
|
||||
|
@ -100,25 +97,30 @@ func createRotateServerStoragePlan(ctx context.Context,
|
|||
context.CreateEvent(k8sutil.NewCannotChangeStorageClassEvent(apiObject, m.ID, group.AsRole(), "Not supported"))
|
||||
}
|
||||
} else {
|
||||
var res core.ResourceList
|
||||
if groupSpec.HasVolumeClaimTemplate() {
|
||||
res := groupSpec.GetVolumeClaimTemplate().Spec.Resources.Requests
|
||||
// For pvc only resources.requests is mutable
|
||||
if comparePVCResourceList(pvc.Spec.Resources.Requests, res) {
|
||||
plan = append(plan, pvcResizePlan(log, group, groupSpec, m.ID)...)
|
||||
}
|
||||
res = groupSpec.GetVolumeClaimTemplate().Spec.Resources.Requests
|
||||
} else {
|
||||
if requestedSize, ok := groupSpec.Resources.Requests[core.ResourceStorage]; ok {
|
||||
if volumeSize, ok := pvc.Spec.Resources.Requests[core.ResourceStorage]; ok {
|
||||
cmp := volumeSize.Cmp(requestedSize)
|
||||
if cmp < 0 {
|
||||
plan = append(plan, pvcResizePlan(log, group, groupSpec, m.ID)...)
|
||||
} else if cmp > 0 {
|
||||
if groupSpec.GetVolumeAllowShrink() && group == api.ServerGroupDBServers {
|
||||
plan = append(plan, api.NewAction(api.ActionTypeMarkToRemoveMember, group, m.ID))
|
||||
} else {
|
||||
log.Error().Str("server-group", group.AsRole()).Str("pvc-storage-size", volumeSize.String()).Str("requested-size", requestedSize.String()).
|
||||
Msg("Volume size should not shrink")
|
||||
}
|
||||
res = groupSpec.Resources.Requests
|
||||
}
|
||||
if requestedSize, ok := res[core.ResourceStorage]; ok {
|
||||
if volumeSize, ok := pvc.Spec.Resources.Requests[core.ResourceStorage]; ok {
|
||||
cmp := volumeSize.Cmp(requestedSize)
|
||||
if cmp < 0 {
|
||||
if groupSpec.VolumeResizeMode.Get() == api.PVCResizeModeRotate {
|
||||
// Do not append more than 1 operation if we hard restart member
|
||||
canContinue = false
|
||||
}
|
||||
plan = append(plan, pvcResizePlan(log, group, groupSpec, m.ID)...)
|
||||
} else if cmp > 0 {
|
||||
// Do not append more than 1 operation if we schrink volume
|
||||
canContinue = false
|
||||
|
||||
if groupSpec.GetVolumeAllowShrink() && group == api.ServerGroupDBServers && !m.Conditions.IsTrue(api.ConditionTypeMarkedToRemove) {
|
||||
plan = append(plan, api.NewAction(api.ActionTypeMarkToRemoveMember, group, m.ID))
|
||||
} else {
|
||||
log.Error().Str("server-group", group.AsRole()).Str("pvc-storage-size", volumeSize.String()).Str("requested-size", requestedSize.String()).
|
||||
Msg("Volume size should not shrink")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -153,21 +155,3 @@ func pvcResizePlan(log zerolog.Logger, group api.ServerGroup, groupSpec api.Serv
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func comparePVCResourceList(wanted, given core.ResourceList) bool {
|
||||
for k, v := range wanted {
|
||||
if gv, ok := given[k]; !ok {
|
||||
return true
|
||||
} else if v.Cmp(gv) != 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
for k := range given {
|
||||
if _, ok := wanted[k]; !ok {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -693,13 +693,7 @@ func TestCreatePlan(t *testing.T) {
|
|||
ad.Status.Members.DBServers[0].PersistentVolumeClaimName = pvcName
|
||||
},
|
||||
ExpectedPlan: []api.Action{
|
||||
api.NewAction(api.ActionTypeDisableClusterScaling, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeWaitForMemberUp, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeCleanOutMember, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeShutdownMember, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeEnableClusterScaling, api.ServerGroupDBServers, ""),
|
||||
api.NewAction(api.ActionTypeMarkToRemoveMember, api.ServerGroupDBServers, ""),
|
||||
},
|
||||
ExpectedLog: "Storage class has changed - pod needs replacement",
|
||||
},
|
||||
|
|
|
@ -171,6 +171,20 @@ func (d *Reconciler) executePlan(ctx context.Context, cachedStatus inspectorInte
|
|||
} else {
|
||||
plan = nil
|
||||
}
|
||||
|
||||
if getActionReloadCachedStatus(action) {
|
||||
log.Info().Msgf("Reloading cached status")
|
||||
if err := cachedStatus.Refresh(ctx); err != nil {
|
||||
log.Warn().Err(err).Msgf("Unable to reload cached status")
|
||||
return plan, recall, nil
|
||||
}
|
||||
}
|
||||
|
||||
if newPlan := getActionPlanAppender(action, plan); !newPlan.Equal(plan) {
|
||||
// Our actions have been added to the end of plan
|
||||
log.Info().Msgf("Appending new plan items")
|
||||
return newPlan, true, nil
|
||||
}
|
||||
} else {
|
||||
if plan[0].StartTime.IsZero() {
|
||||
now := metav1.Now()
|
||||
|
|
|
@ -632,6 +632,7 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
|
|||
m.Conditions.Remove(api.ConditionTypePendingTLSRotation)
|
||||
m.Conditions.Remove(api.ConditionTypePendingRestart)
|
||||
m.Conditions.Remove(api.ConditionTypeRestart)
|
||||
m.Conditions.Remove(api.ConditionTypeCleanedOut)
|
||||
|
||||
m.Upgrade = false
|
||||
r.log.Info().Str("pod", m.PodName).Msgf("Updating member")
|
||||
|
@ -750,9 +751,6 @@ func (r *Resources) EnsurePods(ctx context.Context, cachedStatus inspectorInterf
|
|||
if m.Phase != api.MemberPhasePending {
|
||||
continue
|
||||
}
|
||||
if m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
|
||||
continue
|
||||
}
|
||||
|
||||
member, ok := cachedStatus.ArangoMember(m.ArangoMemberName(r.context.GetName(), group))
|
||||
if !ok {
|
||||
|
|
Loading…
Reference in a new issue