1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Add Pending Phase (#776)

This commit is contained in:
Adam Janikowski 2021-08-20 15:02:36 +02:00 committed by GitHub
parent ab9873d173
commit 2eb4ccc1b6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 762 additions and 356 deletions

View file

@ -3,6 +3,7 @@
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- Update 'github.com/arangodb/arangosync-client' dependency to v0.7.0
- Add HighPriorityPlan to ArangoDeployment Status
- Add Pending Member phase
## [1.2.1](https://github.com/arangodb/kube-arangodb/tree/1.2.1) (2021-07-28)
- Fix ArangoMember race with multiple ArangoDeployments within single namespace

View file

@ -28,6 +28,8 @@ type MemberPhase string
const (
// MemberPhaseNone indicates that the state is not set yet
MemberPhaseNone MemberPhase = ""
// MemberPhasePending indicates that member propagation has been started
MemberPhasePending MemberPhase = "Pending"
// MemberPhaseCreated indicates that all resources needed for the member have been created
MemberPhaseCreated MemberPhase = "Created"
// MemberPhaseFailed indicates that the member is gone beyond hope of recovery. It must be replaced with a new member.
@ -48,6 +50,11 @@ const (
MemberPhaseUpgrading MemberPhase = "Upgrading"
)
// IsPending returns true when given phase == "" OR "Pending"
func (p MemberPhase) IsPending() bool {
return p == MemberPhaseNone || p == MemberPhasePending
}
// IsFailed returns true when given phase == "Failed"
func (p MemberPhase) IsFailed() bool {
return p == MemberPhaseFailed
@ -62,3 +69,13 @@ func (p MemberPhase) IsCreatedOrDrain() bool {
func (p MemberPhase) String() string {
return string(p)
}
// GetPhase parses string into phase
func GetPhase(phase string) (MemberPhase, bool) {
switch p := MemberPhase(phase); p {
case MemberPhaseNone, MemberPhasePending, MemberPhaseCreated, MemberPhaseFailed, MemberPhaseCleanOut, MemberPhaseDrain, MemberPhaseResign, MemberPhaseShuttingDown, MemberPhaseRotating, MemberPhaseRotateStart, MemberPhaseUpgrading:
return p, true
default:
return "", false
}
}

View file

@ -41,6 +41,9 @@ type MemberStatus struct {
// ID holds the unique ID of the member.
// This id is also used within the ArangoDB cluster to identify this server.
ID string `json:"id"`
// RID holds the ID of the member run.
// Value is updated in Pending Phase.
RID types.UID `json:"rid,omitempty"`
// Phase holds the current lifetime phase of this member
Phase MemberPhase `json:"phase"`
// CreatedAt holds the creation timestamp of this member.
@ -82,6 +85,7 @@ type MemberStatus struct {
// Equal checks for equality
func (s MemberStatus) Equal(other MemberStatus) bool {
return s.ID == other.ID &&
s.RID == other.RID &&
s.Phase == other.Phase &&
util.TimeCompareEqual(s.CreatedAt, other.CreatedAt) &&
s.PersistentVolumeClaimName == other.PersistentVolumeClaimName &&

View file

@ -151,7 +151,7 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) {
}
// Try to find a not ready member
for _, m := range l {
if m.Phase == MemberPhaseNone {
if m.Phase.IsPending() {
return m, nil
}
}

View file

@ -49,6 +49,8 @@ func (a ActionType) String() string {
// Priority returns plan priority
func (a ActionType) Priority() ActionPriority {
switch a {
case ActionTypeMemberPhaseUpdate, ActionTypeMemberRIDUpdate:
return ActionPriorityHigh
default:
return ActionPriorityNormal
}
@ -153,6 +155,10 @@ const (
ActionTypeBootstrapUpdate ActionType = "BootstrapUpdate"
// ActionTypeBootstrapSetPassword set password to the bootstrapped user
ActionTypeBootstrapSetPassword ActionType = "BootstrapSetPassword"
// ActionTypeMemberPhaseUpdate updated member phase. High priority
ActionTypeMemberPhaseUpdate ActionType = "MemberPhaseUpdate"
// ActionTypeMemberRIDUpdate updated member Run ID (UID). High priority
ActionTypeMemberRIDUpdate ActionType = "MemberRIDUpdate"
)
const (

View file

@ -28,6 +28,8 @@ type MemberPhase string
const (
// MemberPhaseNone indicates that the state is not set yet
MemberPhaseNone MemberPhase = ""
// MemberPhasePending indicates that member propagation has been started
MemberPhasePending MemberPhase = "Pending"
// MemberPhaseCreated indicates that all resources needed for the member have been created
MemberPhaseCreated MemberPhase = "Created"
// MemberPhaseFailed indicates that the member is gone beyond hope of recovery. It must be replaced with a new member.
@ -48,6 +50,11 @@ const (
MemberPhaseUpgrading MemberPhase = "Upgrading"
)
// IsPending returns true when given phase == "" OR "Pending"
func (p MemberPhase) IsPending() bool {
return p == MemberPhaseNone || p == MemberPhasePending
}
// IsFailed returns true when given phase == "Failed"
func (p MemberPhase) IsFailed() bool {
return p == MemberPhaseFailed
@ -62,3 +69,13 @@ func (p MemberPhase) IsCreatedOrDrain() bool {
func (p MemberPhase) String() string {
return string(p)
}
// GetPhase parses string into phase
func GetPhase(phase string) (MemberPhase, bool) {
switch p := MemberPhase(phase); p {
case MemberPhaseNone, MemberPhasePending, MemberPhaseCreated, MemberPhaseFailed, MemberPhaseCleanOut, MemberPhaseDrain, MemberPhaseResign, MemberPhaseShuttingDown, MemberPhaseRotating, MemberPhaseRotateStart, MemberPhaseUpgrading:
return p, true
default:
return "", false
}
}

View file

@ -41,6 +41,9 @@ type MemberStatus struct {
// ID holds the unique ID of the member.
// This id is also used within the ArangoDB cluster to identify this server.
ID string `json:"id"`
// RID holds the ID of the member run.
// Value is updated in Pending Phase.
RID types.UID `json:"rid,omitempty"`
// Phase holds the current lifetime phase of this member
Phase MemberPhase `json:"phase"`
// CreatedAt holds the creation timestamp of this member.
@ -82,6 +85,7 @@ type MemberStatus struct {
// Equal checks for equality
func (s MemberStatus) Equal(other MemberStatus) bool {
return s.ID == other.ID &&
s.RID == other.RID &&
s.Phase == other.Phase &&
util.TimeCompareEqual(s.CreatedAt, other.CreatedAt) &&
s.PersistentVolumeClaimName == other.PersistentVolumeClaimName &&

View file

@ -151,7 +151,7 @@ func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) {
}
// Try to find a not ready member
for _, m := range l {
if m.Phase == MemberPhaseNone {
if m.Phase.IsPending() {
return m, nil
}
}

View file

@ -49,6 +49,8 @@ func (a ActionType) String() string {
// Priority returns plan priority
func (a ActionType) Priority() ActionPriority {
switch a {
case ActionTypeMemberPhaseUpdate, ActionTypeMemberRIDUpdate:
return ActionPriorityHigh
default:
return ActionPriorityNormal
}
@ -153,6 +155,10 @@ const (
ActionTypeBootstrapUpdate ActionType = "BootstrapUpdate"
// ActionTypeBootstrapSetPassword set password to the bootstrapped user
ActionTypeBootstrapSetPassword ActionType = "BootstrapSetPassword"
// ActionTypeMemberPhaseUpdate updated member phase. High priority
ActionTypeMemberPhaseUpdate ActionType = "MemberPhaseUpdate"
// ActionTypeMemberRIDUpdate updated member Run ID (UID). High priority
ActionTypeMemberRIDUpdate ActionType = "MemberRIDUpdate"
)
const (

View file

@ -96,6 +96,18 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
if testCase.Resources != nil {
testCase.Resources(t, d)
}
// Set Pending phase
require.NoError(t, d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.Phase == api.MemberPhaseNone {
m.Phase = api.MemberPhasePending
if err := d.status.last.Members.Update(m, group); err != nil {
return err
}
}
}
return nil
}))
// Set members
require.NoError(t, d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {

View file

@ -0,0 +1,85 @@
//
// DISCLAIMER
//
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/rs/zerolog"
)
func init() {
registerAction(api.ActionTypeMemberPhaseUpdate, newMemberPhaseUpdate)
}
const (
ActionTypeMemberPhaseUpdatePhaseKey string = "phase"
)
func newMemberPhaseUpdate(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &memberPhaseUpdateAction{}
a.actionImpl = newActionImplDefRef(log, action, actionCtx, defaultTimeout)
return a
}
type memberPhaseUpdateAction struct {
actionImpl
actionEmptyCheckProgress
}
func (a *memberPhaseUpdateAction) Start(ctx context.Context) (bool, error) {
log := a.log
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
return true, nil
}
phaseString, ok := a.action.Params[ActionTypeMemberPhaseUpdatePhaseKey]
if !ok {
log.Error().Msg("Phase not defined")
return true, nil
}
phase, ok := api.GetPhase(phaseString)
if !ok {
log.Error().Msgf("Phase %s unknown", phase)
return true, nil
}
if m.Phase == phase {
return true, nil
}
m.Phase = phase
if err := a.actionCtx.UpdateMember(ctx, m); err != nil {
return false, errors.WithStack(err)
}
return true, nil
}

View file

@ -0,0 +1,68 @@
//
// DISCLAIMER
//
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
"k8s.io/apimachinery/pkg/util/uuid"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/rs/zerolog"
)
func init() {
registerAction(api.ActionTypeMemberRIDUpdate, newMemberRIDUpdate)
}
func newMemberRIDUpdate(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &memberRIDUpdateAction{}
a.actionImpl = newActionImplDefRef(log, action, actionCtx, defaultTimeout)
return a
}
type memberRIDUpdateAction struct {
actionImpl
actionEmptyCheckProgress
}
func (a *memberRIDUpdateAction) Start(ctx context.Context) (bool, error) {
log := a.log
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
return true, nil
}
m.RID = uuid.NewUUID()
if err := a.actionCtx.UpdateMember(ctx, m); err != nil {
return false, errors.WithStack(err)
}
return true, nil
}

View file

@ -17,366 +17,34 @@
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
// Author Adam Janikowski
//
package reconcile
import (
goContext "context"
"time"
"context"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"golang.org/x/net/context"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
driver "github.com/arangodb/go-driver"
upgraderules "github.com/arangodb/go-upgrade-rules"
"github.com/rs/zerolog"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// upgradeDecision is the result of an upgrade check.
type upgradeDecision struct {
FromVersion driver.Version
FromLicense upgraderules.License
ToVersion driver.Version
ToLicense upgraderules.License
UpgradeNeeded bool // If set, the image version has changed
UpgradeAllowed bool // If set, it is an allowed version change
AutoUpgradeNeeded bool // If set, the database must be started with `--database.auto-upgrade` once
Hold bool
}
// CreatePlan considers the current specification & status of the deployment creates a plan to
// get the status in line with the specification.
// If a plan already exists, nothing is done.
func (d *Reconciler) CreatePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
// Create plan
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, lastVersion := d.context.GetStatus()
builderCtx := newPlanBuilderContext(d.context)
newPlan, changed := createPlan(ctx, d.log, apiObject, status.Plan, spec, status, cachedStatus, builderCtx)
var updated bool
// If not change, we're done
if !changed {
return nil, false
if err, u := d.CreateHighPlan(ctx, cachedStatus); err != nil {
return err, false
} else if u {
updated = true
}
// Save plan
if len(newPlan) == 0 {
// Nothing to do
return nil, false
if err, u := d.CreateNormalPlan(ctx, cachedStatus); err != nil {
return err, false
} else if u {
updated = true
}
// Send events
for id := len(status.Plan); id < len(newPlan); id++ {
action := newPlan[id]
d.context.CreateEvent(k8sutil.NewPlanAppendEvent(apiObject, action.Type.String(), action.Group.AsRole(), action.MemberID, action.Reason))
}
status.Plan = newPlan
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
return errors.WithStack(err), false
}
return nil, true
}
func fetchAgency(ctx context.Context, spec api.DeploymentSpec, status api.DeploymentStatus,
context PlanBuilderContext) (*agency.ArangoPlanDatabases, error) {
if spec.GetMode() != api.DeploymentModeCluster && spec.GetMode() != api.DeploymentModeActiveFailover {
return nil, nil
} else if status.Members.Agents.MembersReady() > 0 {
agencyCtx, agencyCancel := goContext.WithTimeout(ctx, time.Minute)
defer agencyCancel()
return agency.GetAgencyCollections(agencyCtx, context.GetAgencyData)
} else {
return nil, errors.Newf("not able to read from agency when agency is down")
}
}
// createPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector,
builderCtx PlanBuilderContext) (api.Plan, bool) {
if !currentPlan.IsEmpty() {
// Plan already exists, complete that first
return currentPlan, false
}
// Fetch agency plan
agencyPlan, agencyErr := fetchAgency(ctx, spec, status, builderCtx)
// Check for various scenario's
var plan api.Plan
pb := NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx)
// Check for members in failed state
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseFailed || len(plan) > 0 {
continue
}
memberLog := log.Info().Str("id", m.ID).Str("role", group.AsRole())
if group == api.ServerGroupDBServers && spec.GetMode() == api.DeploymentModeCluster {
// Do pre check for DBServers. If agency is down DBServers should not be touch
if agencyErr != nil {
memberLog.Msg("Error in agency")
continue
}
if agencyPlan == nil {
memberLog.Msg("AgencyPlan is nil")
continue
}
if agencyPlan.IsDBServerInDatabases(m.ID) {
// DBServer still exists in agency plan! Will not be removed, but needs to be recreated
memberLog.Msg("Recreating DBServer - it cannot be removed gracefully")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
continue
}
// Everything is fine, proceed
}
switch group {
case api.ServerGroupAgents:
// For agents just recreate member do not rotate ID, do not remove PVC or service
memberLog.Msg("Restoring old member. For agency members recreation of PVC is not supported - to prevent DataLoss")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
case api.ServerGroupSingle:
// Do not remove data for singles
memberLog.Msg("Restoring old member. Rotation for single servers is not safe")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
default:
if spec.GetAllowMemberRecreation(group) {
memberLog.Msg("Creating member replacement plan because member has failed")
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
api.NewAction(api.ActionTypeAddMember, group, ""),
api.NewAction(api.ActionTypeWaitForMemberUp, group, api.MemberIDPreviousAction),
)
} else {
memberLog.Msg("Restoring old member. Recreation is disabled for group")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
}
}
}
return nil
})
// Ensure that we were able to get agency info
if len(plan) == 0 && agencyErr != nil {
log.Err(agencyErr).Msg("unable to build further plan without access to agency")
return append(plan,
api.NewAction(api.ActionTypeIdle, api.ServerGroupUnknown, "")), true
}
// Check for cleaned out dbserver in created state
for _, m := range status.Members.DBServers {
if plan.IsEmpty() && m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
log.Debug().
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Msg("Creating dbserver replacement plan because server is cleanout in created phase")
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, m.ID),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
)
}
}
// Update status
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyStatusUpdate)
}
if plan.IsEmpty() {
plan = pb.Apply(createTLSStatusUpdate)
}
if plan.IsEmpty() {
plan = pb.Apply(createJWTStatusUpdate)
}
// Check for scale up/down
if plan.IsEmpty() {
plan = pb.Apply(createScaleMemberPlan)
}
// Check for members to be removed
if plan.IsEmpty() {
plan = pb.Apply(createReplaceMemberPlan)
}
// Check for the need to rotate one or more members
if plan.IsEmpty() {
plan = pb.Apply(createRotateOrUpgradePlan)
}
// Disable maintenance if upgrade process was done. Upgrade task throw IDLE Action if upgrade is pending
if plan.IsEmpty() {
plan = pb.Apply(createMaintenanceManagementPlan)
}
// Add keys
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKey)
}
if plan.IsEmpty() {
plan = pb.Apply(createJWTKeyUpdate)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCARenewalPlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCAAppendPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createKeyfileRenewalPlan)
}
// Check for changes storage classes or requirements
if plan.IsEmpty() {
plan = pb.Apply(createRotateServerStoragePlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createRotateTLSServerSNIPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createRestorePlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyCleanPlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCACleanPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createClusterOperationPlan)
}
// Final
if plan.IsEmpty() {
plan = pb.Apply(createTLSStatusPropagated)
}
if plan.IsEmpty() {
plan = pb.Apply(createBootstrapPlan)
}
// Return plan
return plan, true
}
// createRotateMemberPlan creates a plan to rotate (stop-recreate-start) an existing
// member.
func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus,
group api.ServerGroup, reason string) api.Plan {
log.Debug().
Str("id", member.ID).
Str("role", group.AsRole()).
Str("reason", reason).
Msg("Creating rotation plan")
plan := api.Plan{
api.NewAction(api.ActionTypeCleanTLSKeyfileCertificate, group, member.ID, "Remove server keyfile and enforce renewal/recreation"),
api.NewAction(api.ActionTypeResignLeadership, group, member.ID, reason),
api.NewAction(api.ActionTypeRotateMember, group, member.ID, reason),
api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID),
api.NewAction(api.ActionTypeWaitForMemberInSync, group, member.ID),
}
return plan
}
type planBuilder func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan
type planBuilderCondition func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) bool
type planBuilderSubPlan func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext, w WithPlanBuilder, plans ...planBuilder) api.Plan
func NewWithPlanBuilder(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) WithPlanBuilder {
return &withPlanBuilder{
ctx: ctx,
log: log,
apiObject: apiObject,
spec: spec,
status: status,
cachedStatus: cachedStatus,
context: context,
}
}
type WithPlanBuilder interface {
Apply(p planBuilder) api.Plan
ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan
ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan
}
type withPlanBuilder struct {
ctx context.Context
log zerolog.Logger
apiObject k8sutil.APIObject
spec api.DeploymentSpec
status api.DeploymentStatus
cachedStatus inspectorInterface.Inspector
context PlanBuilderContext
}
func (w withPlanBuilder) ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan {
if !c(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context) {
return api.Plan{}
}
return w.Apply(p)
}
func (w withPlanBuilder) ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context, w, plans...)
}
func (w withPlanBuilder) Apply(p planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context)
return nil, updated
}

View file

@ -0,0 +1,114 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
func (d *Reconciler) CreateHighPlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
// Create plan
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, lastVersion := d.context.GetStatus()
builderCtx := newPlanBuilderContext(d.context)
newPlan, changed := createHighPlan(ctx, d.log, apiObject, status.HighPriorityPlan, spec, status, cachedStatus, builderCtx)
// If not change, we're done
if !changed {
return nil, false
}
// Save plan
if len(newPlan) == 0 {
// Nothing to do
return nil, false
}
// Send events
for id := len(status.Plan); id < len(newPlan); id++ {
action := newPlan[id]
d.context.CreateEvent(k8sutil.NewPlanAppendEvent(apiObject, action.Type.String(), action.Group.AsRole(), action.MemberID, action.Reason))
}
status.HighPriorityPlan = newPlan
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
return errors.WithStack(err), false
}
return nil, true
}
// createHighPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createHighPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector,
builderCtx PlanBuilderContext) (api.Plan, bool) {
if !currentPlan.IsEmpty() {
// Plan already exists, complete that first
return currentPlan, false
}
// Check for various scenario's
var plan api.Plan
pb := NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx)
if plan.IsEmpty() {
plan = pb.Apply(updateMemberPhasePlan)
}
// Return plan
return plan, true
}
// updateMemberPhasePlan creates plan to update member phase
func updateMemberPhasePlan(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.Phase == api.MemberPhaseNone {
plan = append(plan,
api.NewAction(api.ActionTypeMemberRIDUpdate, group, m.ID, "Regenerate member RID"),
api.NewAction(api.ActionTypeMemberPhaseUpdate, group, m.ID,
"Move to Pending phase").AddParam(ActionTypeMemberPhaseUpdatePhaseKey, api.MemberPhasePending.String()))
}
}
return nil
})
return plan
}

View file

@ -0,0 +1,261 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
func (d *Reconciler) CreateNormalPlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
// Create plan
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, lastVersion := d.context.GetStatus()
builderCtx := newPlanBuilderContext(d.context)
newPlan, changed := createNormalPlan(ctx, d.log, apiObject, status.Plan, spec, status, cachedStatus, builderCtx)
// If not change, we're done
if !changed {
return nil, false
}
// Save plan
if len(newPlan) == 0 {
// Nothing to do
return nil, false
}
// Send events
for id := len(status.Plan); id < len(newPlan); id++ {
action := newPlan[id]
d.context.CreateEvent(k8sutil.NewPlanAppendEvent(apiObject, action.Type.String(), action.Group.AsRole(), action.MemberID, action.Reason))
}
status.Plan = newPlan
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
return errors.WithStack(err), false
}
return nil, true
}
// createNormalPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createNormalPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector,
builderCtx PlanBuilderContext) (api.Plan, bool) {
if !currentPlan.IsEmpty() {
// Plan already exists, complete that first
return currentPlan, false
}
// Fetch agency plan
agencyPlan, agencyErr := fetchAgency(ctx, spec, status, builderCtx)
// Check for various scenario's
var plan api.Plan
pb := NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx)
// Check for members in failed state
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseFailed || len(plan) > 0 {
continue
}
memberLog := log.Info().Str("id", m.ID).Str("role", group.AsRole())
if group == api.ServerGroupDBServers && spec.GetMode() == api.DeploymentModeCluster {
// Do pre check for DBServers. If agency is down DBServers should not be touch
if agencyErr != nil {
memberLog.Msg("Error in agency")
continue
}
if agencyPlan == nil {
memberLog.Msg("AgencyPlan is nil")
continue
}
if agencyPlan.IsDBServerInDatabases(m.ID) {
// DBServer still exists in agency plan! Will not be removed, but needs to be recreated
memberLog.Msg("Recreating DBServer - it cannot be removed gracefully")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
continue
}
// Everything is fine, proceed
}
switch group {
case api.ServerGroupAgents:
// For agents just recreate member do not rotate ID, do not remove PVC or service
memberLog.Msg("Restoring old member. For agency members recreation of PVC is not supported - to prevent DataLoss")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
case api.ServerGroupSingle:
// Do not remove data for singles
memberLog.Msg("Restoring old member. Rotation for single servers is not safe")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
default:
if spec.GetAllowMemberRecreation(group) {
memberLog.Msg("Creating member replacement plan because member has failed")
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
api.NewAction(api.ActionTypeAddMember, group, ""),
api.NewAction(api.ActionTypeWaitForMemberUp, group, api.MemberIDPreviousAction),
)
} else {
memberLog.Msg("Restoring old member. Recreation is disabled for group")
plan = append(plan,
api.NewAction(api.ActionTypeRecreateMember, group, m.ID))
}
}
}
return nil
})
// Ensure that we were able to get agency info
if len(plan) == 0 && agencyErr != nil {
log.Err(agencyErr).Msg("unable to build further plan without access to agency")
return append(plan,
api.NewAction(api.ActionTypeIdle, api.ServerGroupUnknown, "")), true
}
// Check for cleaned out dbserver in created state
for _, m := range status.Members.DBServers {
if plan.IsEmpty() && m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
log.Debug().
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Msg("Creating dbserver replacement plan because server is cleanout in created phase")
plan = append(plan,
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, m.ID),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
)
}
}
// Update status
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyStatusUpdate)
}
if plan.IsEmpty() {
plan = pb.Apply(createTLSStatusUpdate)
}
if plan.IsEmpty() {
plan = pb.Apply(createJWTStatusUpdate)
}
// Check for scale up/down
if plan.IsEmpty() {
plan = pb.Apply(createScaleMemberPlan)
}
// Check for members to be removed
if plan.IsEmpty() {
plan = pb.Apply(createReplaceMemberPlan)
}
// Check for the need to rotate one or more members
if plan.IsEmpty() {
plan = pb.Apply(createRotateOrUpgradePlan)
}
// Disable maintenance if upgrade process was done. Upgrade task throw IDLE Action if upgrade is pending
if plan.IsEmpty() {
plan = pb.Apply(createMaintenanceManagementPlan)
}
// Add keys
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKey)
}
if plan.IsEmpty() {
plan = pb.Apply(createJWTKeyUpdate)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCARenewalPlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCAAppendPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createKeyfileRenewalPlan)
}
// Check for changes storage classes or requirements
if plan.IsEmpty() {
plan = pb.Apply(createRotateServerStoragePlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createRotateTLSServerSNIPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createRestorePlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyCleanPlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCACleanPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createClusterOperationPlan)
}
// Final
if plan.IsEmpty() {
plan = pb.Apply(createTLSStatusPropagated)
}
if plan.IsEmpty() {
plan = pb.Apply(createBootstrapPlan)
}
// Return plan
return plan, true
}

View file

@ -56,6 +56,19 @@ var (
}
)
// upgradeDecision is the result of an upgrade check.
type upgradeDecision struct {
FromVersion driver.Version
FromLicense upgraderules.License
ToVersion driver.Version
ToLicense upgraderules.License
UpgradeNeeded bool // If set, the image version has changed
UpgradeAllowed bool // If set, it is an allowed version change
AutoUpgradeNeeded bool // If set, the database must be started with `--database.auto-upgrade` once
Hold bool
}
// createRotateOrUpgradePlan goes over all pods to check if an upgrade or rotate is needed.
func createRotateOrUpgradePlan(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,

View file

@ -310,7 +310,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
status.Hashes.TLS.Propagated = true
status.Hashes.Encryption.Propagated = true
newPlan, changed := createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -321,7 +321,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -336,7 +336,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something1",
},
}
newPlan, changed = createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
}
@ -365,7 +365,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
var status api.DeploymentStatus
addAgentsToStatus(t, &status, 3)
newPlan, changed := createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 2)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -378,7 +378,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 1)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -403,7 +403,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
PodName: "something4",
},
}
newPlan, changed = createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 2) // Note: Downscaling is only down 1 at a time
assert.Equal(t, api.ActionTypeShutdownMember, newPlan[0].Type)
@ -435,7 +435,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
var status api.DeploymentStatus
addAgentsToStatus(t, &status, 3)
newPlan, changed := createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 6) // Adding 3 dbservers & 3 coordinators (note: agents do not scale now)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -468,7 +468,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
PodName: "coordinator1",
},
}
newPlan, changed = createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 3)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -505,7 +505,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
}
spec.DBServers.Count = util.NewInt(1)
spec.Coordinators.Count = util.NewInt(1)
newPlan, changed = createPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 5) // Note: Downscaling is done 1 at a time
assert.Equal(t, api.ActionTypeCleanOutMember, newPlan[0].Type)

View file

@ -0,0 +1,130 @@
//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
// createRotateMemberPlan creates a plan to rotate (stop-recreate-start) an existing
// member.
func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus,
group api.ServerGroup, reason string) api.Plan {
log.Debug().
Str("id", member.ID).
Str("role", group.AsRole()).
Str("reason", reason).
Msg("Creating rotation plan")
plan := api.Plan{
api.NewAction(api.ActionTypeCleanTLSKeyfileCertificate, group, member.ID, "Remove server keyfile and enforce renewal/recreation"),
api.NewAction(api.ActionTypeResignLeadership, group, member.ID, reason),
api.NewAction(api.ActionTypeRotateMember, group, member.ID, reason),
api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID),
api.NewAction(api.ActionTypeWaitForMemberInSync, group, member.ID),
}
return plan
}
func fetchAgency(ctx context.Context, spec api.DeploymentSpec, status api.DeploymentStatus,
pctx PlanBuilderContext) (*agency.ArangoPlanDatabases, error) {
if spec.GetMode() != api.DeploymentModeCluster && spec.GetMode() != api.DeploymentModeActiveFailover {
return nil, nil
} else if status.Members.Agents.MembersReady() > 0 {
agencyCtx, agencyCancel := context.WithTimeout(ctx, time.Minute)
defer agencyCancel()
return agency.GetAgencyCollections(agencyCtx, pctx.GetAgencyData)
} else {
return nil, errors.Newf("not able to read from agency when agency is down")
}
}
type planBuilder func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan
type planBuilderCondition func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) bool
type planBuilderSubPlan func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext, w WithPlanBuilder, plans ...planBuilder) api.Plan
func NewWithPlanBuilder(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) WithPlanBuilder {
return &withPlanBuilder{
ctx: ctx,
log: log,
apiObject: apiObject,
spec: spec,
status: status,
cachedStatus: cachedStatus,
context: context,
}
}
type WithPlanBuilder interface {
Apply(p planBuilder) api.Plan
ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan
ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan
}
type withPlanBuilder struct {
ctx context.Context
log zerolog.Logger
apiObject k8sutil.APIObject
spec api.DeploymentSpec
status api.DeploymentStatus
cachedStatus inspectorInterface.Inspector
context PlanBuilderContext
}
func (w withPlanBuilder) ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan {
if !c(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context) {
return api.Plan{}
}
return w.Apply(p)
}
func (w withPlanBuilder) ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context, w, plans...)
}
func (w withPlanBuilder) Apply(p planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context)
}

View file

@ -707,7 +707,7 @@ func (r *Resources) EnsurePods(ctx context.Context, cachedStatus inspectorInterf
createPodMember := func(group api.ServerGroup, groupSpec api.ServerGroupSpec, status *api.MemberStatusList) error {
for _, m := range *status {
if m.Phase != api.MemberPhaseNone {
if m.Phase != api.MemberPhasePending {
continue
}
if m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {