1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Deprecate ForeachServerGroup, ForeachServerInGroups and ForServerGroup functions and refactor code accordingly (#1069)

This commit is contained in:
Nikita Vanyasin 2022-07-24 21:26:26 +03:00 committed by GitHub
parent f8fb748464
commit 738823e0db
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 631 additions and 801 deletions

View file

@ -3,6 +3,7 @@
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- (Feature) Add ArangoDeployment ServerGroupStatus
- (Feature) (EE) Ordered Member IDs
- (Refactor) Deprecate ForeachServerGroup, ForeachServerInGroups and ForServerGroup functions and refactor code accordingly
## [1.2.15](https://github.com/arangodb/kube-arangodb/tree/1.2.15) (2022-07-20)
- (Bugfix) Ensure pod names not too long

View file

@ -84,10 +84,13 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and the callback is
// not called for the remaining groups.
// Deprecated. Use AsList instead
func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error {
return ds.ForeachServerInGroups(cb, AllServerGroups...)
}
// ForeachServerInGroups calls the given callback for specified server groups.
// Deprecated. Use AsListInGroups instead
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error {
for _, group := range groups {
if err := ds.ForServerGroup(cb, group); err != nil {
@ -98,6 +101,8 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, gro
return nil
}
// ForServerGroup calls the given callback for specified server group.
// Deprecated. Use AsListInGroup or MembersOfGroup
func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error {
switch group {
case ServerGroupSingle:
@ -292,14 +297,11 @@ func (ds DeploymentStatusMembers) MembersOfGroup(group ServerGroup) MemberStatus
func (ds DeploymentStatusMembers) PodNames() []string {
var n []string
ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, m := range list {
if m.PodName != "" {
n = append(n, m.PodName)
}
for _, m := range ds.AsList() {
if m.Member.PodName != "" {
n = append(n, m.Member.PodName)
}
return nil
})
}
return n
}

View file

@ -39,46 +39,30 @@ func newMemberList() DeploymentStatusMembers {
func Test_StatusMemberList_EnsureDefaultExecutionOrder(t *testing.T) {
statusMembers := newMemberList()
order := AllServerGroups
orderIndex := 0
statusMembers.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, e := range statusMembers.AsList() {
require.True(t, orderIndex < len(order))
require.Equal(t, order[orderIndex], group)
require.Len(t, list, 1)
require.Equal(t, order[orderIndex].AsRole(), list[0].ID)
require.Equal(t, order[orderIndex], e.Group)
require.Equal(t, order[orderIndex].AsRole(), e.Member.ID)
orderIndex += 1
return nil
})
}
}
func Test_StatusMemberList_CustomExecutionOrder(t *testing.T) {
statusMembers := newMemberList()
order := []ServerGroup{
ServerGroupDBServers,
}
orderIndex := 0
statusMembers.ForeachServerInGroups(func(group ServerGroup, list MemberStatusList) error {
for _, e := range statusMembers.AsListInGroups(order...) {
require.True(t, orderIndex < len(order))
require.Equal(t, order[orderIndex], group)
require.Len(t, list, 1)
require.Equal(t, order[orderIndex].AsRole(), list[0].ID)
require.Equal(t, order[orderIndex], e.Group)
require.Equal(t, order[orderIndex].AsRole(), e.Member.ID)
orderIndex += 1
return nil
}, order...)
}
}

View file

@ -84,10 +84,13 @@ func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGr
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and the callback is
// not called for the remaining groups.
// Deprecated. Use AsList instead
func (ds DeploymentStatusMembers) ForeachServerGroup(cb MemberStatusFunc) error {
return ds.ForeachServerInGroups(cb, AllServerGroups...)
}
// ForeachServerInGroups calls the given callback for specified server groups.
// Deprecated. Use AsListInGroups instead
func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, groups ...ServerGroup) error {
for _, group := range groups {
if err := ds.ForServerGroup(cb, group); err != nil {
@ -98,6 +101,8 @@ func (ds DeploymentStatusMembers) ForeachServerInGroups(cb MemberStatusFunc, gro
return nil
}
// ForServerGroup calls the given callback for specified server group.
// Deprecated. Use AsListInGroup or MembersOfGroup
func (ds DeploymentStatusMembers) ForServerGroup(cb MemberStatusFunc, group ServerGroup) error {
switch group {
case ServerGroupSingle:
@ -292,14 +297,11 @@ func (ds DeploymentStatusMembers) MembersOfGroup(group ServerGroup) MemberStatus
func (ds DeploymentStatusMembers) PodNames() []string {
var n []string
ds.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, m := range list {
if m.PodName != "" {
n = append(n, m.PodName)
}
for _, m := range ds.AsList() {
if m.Member.PodName != "" {
n = append(n, m.Member.PodName)
}
return nil
})
}
return n
}

View file

@ -39,46 +39,30 @@ func newMemberList() DeploymentStatusMembers {
func Test_StatusMemberList_EnsureDefaultExecutionOrder(t *testing.T) {
statusMembers := newMemberList()
order := AllServerGroups
orderIndex := 0
statusMembers.ForeachServerGroup(func(group ServerGroup, list MemberStatusList) error {
for _, e := range statusMembers.AsList() {
require.True(t, orderIndex < len(order))
require.Equal(t, order[orderIndex], group)
require.Len(t, list, 1)
require.Equal(t, order[orderIndex].AsRole(), list[0].ID)
require.Equal(t, order[orderIndex], e.Group)
require.Equal(t, order[orderIndex].AsRole(), e.Member.ID)
orderIndex += 1
return nil
})
}
}
func Test_StatusMemberList_CustomExecutionOrder(t *testing.T) {
statusMembers := newMemberList()
order := []ServerGroup{
ServerGroupDBServers,
}
orderIndex := 0
statusMembers.ForeachServerInGroups(func(group ServerGroup, list MemberStatusList) error {
for _, e := range statusMembers.AsListInGroups(order...) {
require.True(t, orderIndex < len(order))
require.Equal(t, order[orderIndex], group)
require.Len(t, list, 1)
require.Equal(t, order[orderIndex].AsRole(), list[0].ID)
require.Equal(t, order[orderIndex], e.Group)
require.Equal(t, order[orderIndex].AsRole(), e.Member.ID)
orderIndex += 1
return nil
}, order...)
}
}

View file

@ -359,26 +359,22 @@ func (d *Deployment) isUpToDateStatus(status api.DeploymentStatus) (upToDate boo
if !status.Conditions.Check(api.ConditionTypeReachable).Exists().IsTrue().Evaluate() {
upToDate = false
return
}
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
if !upToDate {
return nil
for _, m := range status.Members.AsList() {
member := m.Member
if member.Conditions.IsTrue(api.ConditionTypeRestart) || member.Conditions.IsTrue(api.ConditionTypePendingRestart) {
upToDate = false
reason = "Pending restarts on members"
return
}
for _, member := range list {
if member.Conditions.IsTrue(api.ConditionTypeRestart) || member.Conditions.IsTrue(api.ConditionTypePendingRestart) {
upToDate = false
reason = "Pending restarts on members"
return nil
}
if member.Conditions.IsTrue(api.ConditionTypePVCResizePending) {
upToDate = false
reason = "PVC is resizing"
return nil
}
if member.Conditions.IsTrue(api.ConditionTypePVCResizePending) {
upToDate = false
reason = "PVC is resizing"
return
}
return nil
})
}
return
}

View file

@ -124,93 +124,87 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
}
// Set Pending phase
require.NoError(t, d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.Phase == api.MemberPhaseNone {
m.Phase = api.MemberPhasePending
if err := d.status.last.Members.Update(m, group); err != nil {
return err
}
}
for _, e := range d.status.last.Members.AsList() {
m := e.Member
if m.Phase == api.MemberPhaseNone {
m.Phase = api.MemberPhasePending
require.NoError(t, d.status.last.Members.Update(m, e.Group))
}
return nil
}))
}
// Set members
if err := d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
member := api.ArangoMember{
ObjectMeta: meta.ObjectMeta{
Namespace: d.GetNamespace(),
Name: m.ArangoMemberName(d.GetName(), group),
},
Spec: api.ArangoMemberSpec{
Group: group,
ID: m.ID,
},
}
c := d.WithCurrentArangoMember(m.ArangoMemberName(d.GetName(), group))
if err := c.Create(context.Background(), &member); err != nil {
return err
}
s := core.Service{
ObjectMeta: meta.ObjectMeta{
Name: member.GetName(),
Namespace: member.GetNamespace(),
},
}
if _, err := d.ServicesModInterface().Create(context.Background(), &s, meta.CreateOptions{}); err != nil {
return err
}
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
groupSpec := d.apiObject.Spec.GetServerGroupSpec(group)
image, ok := d.resources.SelectImage(d.apiObject.Spec, d.status.last)
require.True(t, ok)
template, err := d.resources.RenderPodTemplateForMember(context.Background(), d.ACS(), d.apiObject.Spec, d.status.last, m.ID, image)
if err != nil {
return err
}
checksum, err := resources.ChecksumArangoPod(groupSpec, resources.CreatePodFromTemplate(template))
require.NoError(t, err)
podTemplate, err := api.GetArangoMemberPodTemplate(template, checksum)
require.NoError(t, err)
member.Status.Template = podTemplate
member.Spec.Template = podTemplate
if err := c.Update(context.Background(), func(obj *api.ArangoMember) bool {
obj.Spec.Template = podTemplate
return true
}); err != nil {
return err
}
if err := c.UpdateStatus(context.Background(), func(obj *api.ArangoMember, s *api.ArangoMemberStatus) bool {
s.Template = podTemplate
return true
}); err != nil {
return err
}
var loopErr error
for _, e := range d.status.last.Members.AsList() {
m := e.Member
group := e.Group
member := api.ArangoMember{
ObjectMeta: meta.ObjectMeta{
Namespace: d.GetNamespace(),
Name: m.ArangoMemberName(d.GetName(), group),
},
Spec: api.ArangoMemberSpec{
Group: group,
ID: m.ID,
},
}
return nil
}); err != nil {
if testCase.ExpectedError != nil && assert.EqualError(t, err, testCase.ExpectedError.Error()) {
return
c := d.WithCurrentArangoMember(m.ArangoMemberName(d.GetName(), group))
if loopErr = c.Create(context.Background(), &member); loopErr != nil {
break
}
s := core.Service{
ObjectMeta: meta.ObjectMeta{
Name: member.GetName(),
Namespace: member.GetNamespace(),
},
}
if _, loopErr = d.ServicesModInterface().Create(context.Background(), &s, meta.CreateOptions{}); loopErr != nil {
break
}
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))
groupSpec := d.apiObject.Spec.GetServerGroupSpec(group)
image, ok := d.resources.SelectImage(d.apiObject.Spec, d.status.last)
require.True(t, ok)
var template *core.PodTemplateSpec
template, loopErr = d.resources.RenderPodTemplateForMember(context.Background(), d.ACS(), d.apiObject.Spec, d.status.last, m.ID, image)
if loopErr != nil {
break
}
checksum, err := resources.ChecksumArangoPod(groupSpec, resources.CreatePodFromTemplate(template))
require.NoError(t, err)
podTemplate, err := api.GetArangoMemberPodTemplate(template, checksum)
require.NoError(t, err)
member.Status.Template = podTemplate
member.Spec.Template = podTemplate
if loopErr = c.Update(context.Background(), func(obj *api.ArangoMember) bool {
obj.Spec.Template = podTemplate
return true
}); loopErr != nil {
break
}
if loopErr = c.UpdateStatus(context.Background(), func(obj *api.ArangoMember, s *api.ArangoMemberStatus) bool {
s.Template = podTemplate
return true
}); loopErr != nil {
break
}
}
if loopErr != nil && testCase.ExpectedError != nil && assert.EqualError(t, loopErr, testCase.ExpectedError.Error()) {
return
}
require.NoError(t, err)
// Act
require.NoError(t, d.acs.CurrentClusterCache().Refresh(context.Background()))

View file

@ -69,44 +69,52 @@ func (a actionSetMemberConditionV2) Start(ctx context.Context) (bool, error) {
as := a.action.Params[setConditionActionV2KeyStatus] == string(core.ConditionTrue)
if err := a.actionCtx.WithStatusUpdateErr(ctx, func(s *api.DeploymentStatus) (bool, error) {
var changed bool
status, g, ok := s.Members.ElementByID(a.action.MemberID)
if !ok {
a.log.Info("can not set the condition because the member is gone already")
return false, nil
}
s.Members.ForServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for i := range members {
if members[i].ID == a.action.MemberID {
changed = members[i].Conditions.UpdateWithHash(api.ConditionType(aa), as, ar, am, ah)
return nil
}
if g != a.action.Group {
a.log.Info("can not set the condition because of invalid groups")
return false, nil
}
if status.Conditions.UpdateWithHash(api.ConditionType(aa), as, ar, am, ah) {
if err := s.Members.Update(status, g); err != nil {
return false, err
}
a.log.Info("can not set the condition because the member is gone already")
return nil
}, a.action.Group)
return true, nil
}
// If not found then false is returned.
return changed, nil
return false, nil
}); err != nil {
a.log.Err(err).Warn("unable to update status")
return true, nil
}
case setConditionActionV2KeyTypeRemove:
if err := a.actionCtx.WithStatusUpdateErr(ctx, func(s *api.DeploymentStatus) (bool, error) {
var changed bool
status, g, ok := s.Members.ElementByID(a.action.MemberID)
if !ok {
a.log.Info("can not set the condition because the member is gone already")
return false, nil
}
s.Members.ForServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for i := range members {
if members[i].ID == a.action.MemberID {
changed = members[i].Conditions.Remove(api.ConditionType(aa))
return nil
}
if g != a.action.Group {
a.log.Info("can not set the condition because of invalid groups")
return false, nil
}
if status.Conditions.Remove(api.ConditionType(aa)) {
if err := s.Members.Update(status, g); err != nil {
return false, err
}
a.log.Info("can not remove the condition because the member is gone already")
return nil
}, a.action.Group)
return true, nil
}
// If not found then false is returned.
return changed, nil
return false, nil
}); err != nil {
a.log.Err(err).Warn("unable to update status")
return true, nil

View file

@ -67,13 +67,9 @@ func (r *Reconciler) createClusterOperationPlan(ctx context.Context, apiObject k
membersHealth := health.Health
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
delete(membersHealth, driver.ServerID(m.ID))
}
return nil
})
for _, e := range status.Members.AsList() {
delete(membersHealth, driver.ServerID(e.Member.ID))
}
if len(membersHealth) == 0 {
return nil

View file

@ -226,12 +226,11 @@ func (r *Reconciler) createEncryptionKeyCleanPlan(ctx context.Context, apiObject
func (r *Reconciler) areEncryptionKeysUpToDate(ctx context.Context, spec api.DeploymentSpec,
status api.DeploymentStatus, context PlanBuilderContext, folder *core.Secret) (plan api.Plan, failed bool) {
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, group := range api.AllServerGroups {
if !pod.GroupEncryptionSupported(spec.Mode.Get(), group) {
return nil
continue
}
for _, m := range list {
for _, m := range status.Members.MembersOfGroup(group) {
if updateRequired, failedMember := r.isEncryptionKeyUpToDate(ctx, status, context, group, m, folder); failedMember {
failed = true
continue
@ -240,9 +239,7 @@ func (r *Reconciler) areEncryptionKeysUpToDate(ctx context.Context, spec api.Dep
continue
}
}
return nil
})
}
return
}

View file

@ -73,17 +73,13 @@ func (r *Reconciler) updateMemberPodTemplateSpec(ctx context.Context, apiObject
var plan api.Plan
// Update member specs
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseNone {
if reason, changed := r.arangoMemberPodTemplateNeedsUpdate(ctx, apiObject, spec, group, status, m, context); changed {
plan = append(plan, actions.NewAction(api.ActionTypeArangoMemberUpdatePodSpec, group, m, reason))
}
for _, e := range status.Members.AsList() {
if e.Member.Phase != api.MemberPhaseNone {
if reason, changed := r.arangoMemberPodTemplateNeedsUpdate(ctx, apiObject, spec, e.Group, status, e.Member, context); changed {
plan = append(plan, actions.NewAction(api.ActionTypeArangoMemberUpdatePodSpec, e.Group, e.Member, reason))
}
}
return nil
})
}
return plan
}
@ -94,24 +90,18 @@ func (r *Reconciler) updateMemberPhasePlan(ctx context.Context, apiObject k8suti
context PlanBuilderContext) api.Plan {
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.Phase == api.MemberPhaseNone {
var p api.Plan
p = append(p,
actions.NewAction(api.ActionTypeArangoMemberUpdatePodSpec, group, m, "Propagating spec of pod"),
actions.NewAction(api.ActionTypeArangoMemberUpdatePodStatus, group, m, "Propagating status of pod"))
p = append(p, actions.NewAction(api.ActionTypeMemberPhaseUpdate, group, m,
"Move to Pending phase").AddParam(actionTypeMemberPhaseUpdatePhaseKey, api.MemberPhasePending.String()))
plan = append(plan, p...)
}
for _, e := range status.Members.AsList() {
if e.Member.Phase == api.MemberPhaseNone {
var p api.Plan
p = append(p,
actions.NewAction(api.ActionTypeArangoMemberUpdatePodSpec, e.Group, e.Member, "Propagating spec of pod"),
actions.NewAction(api.ActionTypeArangoMemberUpdatePodStatus, e.Group, e.Member, "Propagating status of pod"),
actions.NewAction(api.ActionTypeMemberPhaseUpdate, e.Group, e.Member,
"Move to Pending phase").AddParam(actionTypeMemberPhaseUpdatePhaseKey, api.MemberPhasePending.String()),
)
plan = append(plan, p...)
}
return nil
})
}
return plan
}
@ -133,27 +123,20 @@ func (r *Reconciler) updateMemberUpdateConditionsPlan(ctx context.Context, apiOb
context PlanBuilderContext) api.Plan {
var plan api.Plan
if err := status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.Conditions.IsTrue(api.ConditionTypeUpdating) {
// We are in updating phase
if status.Plan.IsEmpty() {
// If plan is empty then something went wrong
plan = append(plan,
actions.NewAction(api.ActionTypeSetMemberCondition, group, m, "Clean update actions after failure").
AddParam(api.ConditionTypePendingUpdate.String(), "").
AddParam(api.ConditionTypeUpdating.String(), "").
AddParam(api.ConditionTypeUpdateFailed.String(), "T").
AddParam(api.ConditionTypePendingRestart.String(), "T"),
)
}
for _, e := range status.Members.AsList() {
if e.Member.Conditions.IsTrue(api.ConditionTypeUpdating) {
// We are in updating phase
if status.Plan.IsEmpty() {
// If plan is empty then something went wrong
plan = append(plan,
actions.NewAction(api.ActionTypeSetMemberCondition, e.Group, e.Member, "Clean update actions after failure").
AddParam(api.ConditionTypePendingUpdate.String(), "").
AddParam(api.ConditionTypeUpdating.String(), "").
AddParam(api.ConditionTypeUpdateFailed.String(), "T").
AddParam(api.ConditionTypePendingRestart.String(), "T"),
)
}
}
return nil
}); err != nil {
r.log.Err(err).Error("Error while generating update plan")
return nil
}
return plan
@ -164,29 +147,23 @@ func (r *Reconciler) updateMemberRotationConditionsPlan(ctx context.Context, api
context PlanBuilderContext) api.Plan {
var plan api.Plan
if err := status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
cache, ok := context.ACS().ClusterCache(m.ClusterID)
if !ok {
continue
}
p, ok := cache.Pod().V1().GetSimple(m.PodName)
if !ok {
p = nil
}
if p, err := r.updateMemberRotationConditions(apiObject, spec, m, group, p, context); err != nil {
return err
} else if len(p) > 0 {
plan = append(plan, p...)
}
for _, e := range status.Members.AsList() {
cache, ok := context.ACS().ClusterCache(e.Member.ClusterID)
if !ok {
continue
}
return nil
}); err != nil {
r.log.Err(err).Error("Error while generating rotation plan")
return nil
p, ok := cache.Pod().V1().GetSimple(e.Member.PodName)
if !ok {
p = nil
}
if p, err := r.updateMemberRotationConditions(apiObject, spec, e.Member, e.Group, p, context); err != nil {
r.log.Err(err).Error("Error while generating rotation plan")
return nil
} else if len(p) > 0 {
plan = append(plan, p...)
}
}
return plan

View file

@ -202,21 +202,17 @@ func (r *Reconciler) areJWTTokensUpToDate(ctx context.Context, status api.Deploy
gCtx, c := context.WithTimeout(ctx, 2*time.Second)
defer c()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
nCtx, c := context.WithTimeout(gCtx, 500*time.Millisecond)
defer c()
if updateRequired, failedMember := r.isJWTTokenUpToDate(nCtx, status, planCtx, group, m, folder); failedMember {
failed = true
continue
} else if updateRequired {
plan = append(plan, actions.NewAction(api.ActionTypeJWTRefresh, group, m))
continue
}
for _, e := range status.Members.AsList() {
nCtx, c := context.WithTimeout(gCtx, 500*time.Millisecond)
defer c()
if updateRequired, failedMember := r.isJWTTokenUpToDate(nCtx, status, planCtx, e.Group, e.Member, folder); failedMember {
failed = true
continue
} else if updateRequired {
plan = append(plan, actions.NewAction(api.ActionTypeJWTRefresh, e.Group, e.Member))
continue
}
return nil
})
}
return
}

View file

@ -57,13 +57,15 @@ func (r *Reconciler) createMemberFailedRestoreInternal(_ context.Context, _ k8su
agencyState, agencyOK := context.GetAgencyCache()
// Check for members in failed state.
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, group := range api.AllServerGroups {
members := status.Members.MembersOfGroup(group)
failed := 0
for _, m := range members {
if m.Phase == api.MemberPhaseFailed {
failed++
}
}
for _, m := range members {
if m.Phase != api.MemberPhaseFailed || len(plan) > 0 {
continue
@ -116,8 +118,7 @@ func (r *Reconciler) createMemberFailedRestoreInternal(_ context.Context, _ k8su
}
}
}
return nil
})
}
if len(plan) == 0 && !agencyOK {
r.log.Warn("unable to build further plan without access to agency")

View file

@ -85,36 +85,33 @@ func (r *Reconciler) createMarkToRemovePlan(ctx context.Context, apiObject k8sut
context PlanBuilderContext) api.Plan {
var plan api.Plan
status.Members.ForeachServerInGroups(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseCreated || m.PodName == "" {
// Only rotate when phase is created
continue
}
for _, e := range status.Members.AsListInGroups(rotationByAnnotationOrder...) {
m := e.Member
group := e.Group
if m.Phase != api.MemberPhaseCreated || m.PodName == "" {
// Only rotate when phase is created
continue
}
cache, ok := context.ACS().ClusterCache(m.ClusterID)
if !ok {
continue
}
cache, ok := context.ACS().ClusterCache(m.ClusterID)
if !ok {
continue
}
pod, found := cache.Pod().V1().GetSimple(m.PodName)
if !found {
continue
}
pod, found := cache.Pod().V1().GetSimple(m.PodName)
if !found {
continue
}
if pod.Annotations != nil {
if _, ok := pod.Annotations[deployment.ArangoDeploymentPodReplaceAnnotation]; ok && (group == api.ServerGroupDBServers || group == api.ServerGroupAgents || group == api.ServerGroupCoordinators) {
if !m.Conditions.IsTrue(api.ConditionTypeMarkedToRemove) {
plan = append(plan, actions.NewAction(api.ActionTypeMarkToRemoveMember, group, m, "Replace flag present"))
continue
}
if pod.Annotations != nil {
if _, ok := pod.Annotations[deployment.ArangoDeploymentPodReplaceAnnotation]; ok && (group == api.ServerGroupDBServers || group == api.ServerGroupAgents || group == api.ServerGroupCoordinators) {
if !m.Conditions.IsTrue(api.ConditionTypeMarkedToRemove) {
plan = append(plan, actions.NewAction(api.ActionTypeMarkToRemoveMember, group, m, "Replace flag present"))
continue
}
}
}
return nil
}, rotationByAnnotationOrder...)
}
return plan
}

View file

@ -116,44 +116,41 @@ func (r *Reconciler) createReplaceMemberPlan(ctx context.Context, apiObject k8su
var plan api.Plan
// Replace is only allowed for Coordinators, DBServers & Agents
status.Members.ForeachServerInGroups(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, member := range list {
if !plan.IsEmpty() {
return nil
}
if member.Conditions.IsTrue(api.ConditionTypeMarkedToRemove) {
ready, message := groupReadyForRestart(context, status, member, group)
if !ready {
r.planLogger.Str("member", member.ID).Str("role", group.AsRole()).Str("message", message).Warn("Unable to recreate member")
continue
}
switch group {
case api.ServerGroupDBServers:
plan = append(plan, actions.NewAction(api.ActionTypeAddMember, group, withPredefinedMember("")))
r.planLogger.
Str("role", group.AsRole()).
Debug("Creating replacement plan")
return nil
case api.ServerGroupCoordinators:
plan = append(plan, actions.NewAction(api.ActionTypeRemoveMember, group, member))
r.planLogger.
Str("role", group.AsRole()).
Debug("Creating replacement plan")
return nil
case api.ServerGroupAgents:
plan = append(plan, actions.NewAction(api.ActionTypeRemoveMember, group, member),
actions.NewAction(api.ActionTypeAddMember, group, withPredefinedMember("")))
r.planLogger.
Str("role", group.AsRole()).
Debug("Creating replacement plan")
return nil
}
}
for _, e := range status.Members.AsListInGroups(api.ServerGroupAgents, api.ServerGroupDBServers, api.ServerGroupCoordinators) {
if !plan.IsEmpty() {
break
}
return nil
}, api.ServerGroupAgents, api.ServerGroupDBServers, api.ServerGroupCoordinators)
member := e.Member
group := e.Group
if member.Conditions.IsTrue(api.ConditionTypeMarkedToRemove) {
ready, message := groupReadyForRestart(context, status, member, group)
if !ready {
r.planLogger.Str("member", member.ID).Str("role", group.AsRole()).Str("message", message).Warn("Unable to recreate member")
continue
}
switch group {
case api.ServerGroupDBServers:
plan = append(plan, actions.NewAction(api.ActionTypeAddMember, group, withPredefinedMember("")))
r.planLogger.
Str("role", group.AsRole()).
Debug("Creating replacement plan")
case api.ServerGroupCoordinators:
plan = append(plan, actions.NewAction(api.ActionTypeRemoveMember, group, member))
r.planLogger.
Str("role", group.AsRole()).
Debug("Creating replacement plan")
case api.ServerGroupAgents:
plan = append(plan, actions.NewAction(api.ActionTypeRemoveMember, group, member),
actions.NewAction(api.ActionTypeAddMember, group, withPredefinedMember("")))
r.planLogger.
Str("role", group.AsRole()).
Debug("Creating replacement plan")
}
}
}
return plan
}

View file

@ -977,14 +977,10 @@ func TestCreatePlan(t *testing.T) {
},
}
require.NoError(t, c.context.ArangoDeployment.Status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
m.Phase = api.MemberPhaseCreated
require.NoError(t, c.context.ArangoDeployment.Status.Members.Update(m, group))
}
return nil
}))
for _, e := range c.context.ArangoDeployment.Status.Members.AsList() {
e.Member.Phase = api.MemberPhaseCreated
require.NoError(t, c.context.ArangoDeployment.Status.Members.Update(e.Member, e.Group))
}
},
context: &testContext{
ArangoDeployment: deploymentTemplate.DeepCopy(),

View file

@ -288,9 +288,7 @@ func (r *Reconciler) createKeyfileRenewalPlanSynced(ctx context.Context, apiObje
var plan api.Plan
group := api.ServerGroupSyncMasters
for _, statusMember := range status.Members.AsListInGroup(group) {
member := statusMember.Member
for _, member := range status.Members.MembersOfGroup(group) {
if !plan.IsEmpty() {
continue
}
@ -321,32 +319,21 @@ func (r *Reconciler) createKeyfileRenewalPlanDefault(ctx context.Context, apiObj
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
if !group.IsArangod() {
return nil
for _, e := range status.Members.AsListInGroups(api.AllArangoDServerGroups...) {
cache, ok := planCtx.ACS().ClusterCache(e.Member.ClusterID)
if !ok {
continue
}
for _, member := range members {
if !plan.IsEmpty() {
return nil
}
lCtx, c := context.WithTimeout(ctx, 500*time.Millisecond)
defer c()
cache, ok := planCtx.ACS().ClusterCache(member.ClusterID)
if !ok {
continue
}
lCtx, c := context.WithTimeout(ctx, 500*time.Millisecond)
defer c()
if renew, _ := r.keyfileRenewalRequired(lCtx, apiObject, spec.TLS, spec, cache, planCtx, group, member, api.TLSRotateModeRecreate); renew {
r.planLogger.Info("Renewal of keyfile required - Recreate (server)")
plan = append(plan, tlsRotateConditionAction(group, member.ID, "Restart server after keyfile removal"))
}
if renew, _ := r.keyfileRenewalRequired(lCtx, apiObject, spec.TLS, spec, cache, planCtx, e.Group, e.Member, api.TLSRotateModeRecreate); renew {
r.planLogger.Info("Renewal of keyfile required - Recreate (server)")
plan = append(plan, tlsRotateConditionAction(e.Group, e.Member.ID, "Restart server after keyfile removal"))
break
}
return nil
})
}
return plan
}
@ -360,31 +347,23 @@ func (r *Reconciler) createKeyfileRenewalPlanInPlace(ctx context.Context, apiObj
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
if !group.IsArangod() {
return nil
for _, e := range status.Members.AsListInGroups(api.AllArangoDServerGroups...) {
cache, ok := planCtx.ACS().ClusterCache(e.Member.ClusterID)
if !ok {
continue
}
for _, member := range members {
cache, ok := planCtx.ACS().ClusterCache(member.ClusterID)
if !ok {
continue
}
lCtx, c := context.WithTimeout(ctx, 500*time.Millisecond)
defer c()
lCtx, c := context.WithTimeout(ctx, 500*time.Millisecond)
defer c()
if renew, recreate := r.keyfileRenewalRequired(lCtx, apiObject, spec.TLS, spec, cache, planCtx, group, member, api.TLSRotateModeInPlace); renew {
r.planLogger.Info("Renewal of keyfile required - InPlace (server)")
if recreate {
plan = append(plan, actions.NewAction(api.ActionTypeCleanTLSKeyfileCertificate, group, member, "Remove server keyfile and enforce renewal"))
}
plan = append(plan, actions.NewAction(api.ActionTypeRefreshTLSKeyfileCertificate, group, member, "Renew Member Keyfile"))
if renew, recreate := r.keyfileRenewalRequired(lCtx, apiObject, spec.TLS, spec, cache, planCtx, e.Group, e.Member, api.TLSRotateModeInPlace); renew {
r.planLogger.Info("Renewal of keyfile required - InPlace (server)")
if recreate {
plan = append(plan, actions.NewAction(api.ActionTypeCleanTLSKeyfileCertificate, e.Group, e.Member, "Remove server keyfile and enforce renewal"))
}
plan = append(plan, actions.NewAction(api.ActionTypeRefreshTLSKeyfileCertificate, e.Group, e.Member, "Renew Member Keyfile"))
}
return nil
})
}
return plan
}
@ -419,27 +398,19 @@ func createKeyfileRenewalPlanMode(
mode := spec.TLS.Mode.Get()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, e := range status.Members.AsList() {
if mode != api.TLSRotateModeInPlace {
return nil
break
}
for _, member := range list {
if mode != api.TLSRotateModeInPlace {
return nil
}
if i, ok := status.Images.GetByImageID(member.ImageID); !ok {
if i, ok := status.Images.GetByImageID(e.Member.ImageID); !ok {
mode = api.TLSRotateModeRecreate
} else {
if !features.TLSRotation().Supported(i.ArangoDBVersion, i.Enterprise) {
mode = api.TLSRotateModeRecreate
} else {
if !features.TLSRotation().Supported(i.ArangoDBVersion, i.Enterprise) {
mode = api.TLSRotateModeRecreate
}
}
}
return nil
})
}
return mode
}

View file

@ -56,15 +56,14 @@ func (r *Reconciler) createRotateTLSServerSNIPlan(ctx context.Context, apiObject
}
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, group := range api.AllServerGroups {
if !pod.GroupSNISupported(spec.Mode.Get(), group) {
return nil
continue
}
for _, m := range members {
for _, m := range status.Members.MembersOfGroup(group) {
if !plan.IsEmpty() {
// Only 1 member at a time
return nil
break
}
if m.Phase != api.MemberPhaseCreated {
@ -96,8 +95,7 @@ func (r *Reconciler) createRotateTLSServerSNIPlan(ctx context.Context, apiObject
})
if err != nil {
r.planLogger.Err(err).Info("SNI compare failed")
return nil
break
} else if !ok {
switch spec.TLS.Mode.Get() {
case api.TLSRotateModeRecreate:
@ -111,7 +109,6 @@ func (r *Reconciler) createRotateTLSServerSNIPlan(ctx context.Context, apiObject
}
}
}
return nil
})
}
return plan
}

View file

@ -41,70 +41,68 @@ const (
func (r *Resilience) CheckMemberFailure(ctx context.Context) error {
status, lastVersion := r.context.GetStatus()
updateStatusNeeded := false
if err := status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
log := r.log("member-failure").
Str("id", m.ID).
Str("role", group.AsRole())
// Check if there are Members with Phase Upgrading or Rotation but no plan
switch m.Phase {
case api.MemberPhaseNone, api.MemberPhasePending:
continue
case api.MemberPhaseUpgrading, api.MemberPhaseRotating, api.MemberPhaseCleanOut, api.MemberPhaseRotateStart:
if len(status.Plan) == 0 {
log.Error("No plan but member is in phase %s - marking as failed", m.Phase)
for _, e := range status.Members.AsList() {
m := e.Member
group := e.Group
log := r.log("member-failure").
Str("id", m.ID).
Str("role", group.AsRole())
// Check if there are Members with Phase Upgrading or Rotation but no plan
switch m.Phase {
case api.MemberPhaseNone, api.MemberPhasePending:
continue
case api.MemberPhaseUpgrading, api.MemberPhaseRotating, api.MemberPhaseCleanOut, api.MemberPhaseRotateStart:
if len(status.Plan) == 0 {
log.Error("No plan but member is in phase %s - marking as failed", m.Phase)
m.Phase = api.MemberPhaseFailed
status.Members.Update(m, group)
updateStatusNeeded = true
}
}
// Check if pod is ready
if m.Conditions.IsTrue(api.ConditionTypeReady) {
// Pod is now ready, so we're not looking further
continue
}
// Check not ready for a long time
if !m.Phase.IsFailed() {
if m.IsNotReadySince(time.Now().Add(-notReadySinceGracePeriod)) {
// Member has terminated too often in recent history.
failureAcceptable, reason := r.isMemberFailureAcceptable(group, m)
if failureAcceptable {
log.Info("Member is not ready for long time, marking is failed")
m.Phase = api.MemberPhaseFailed
status.Members.Update(m, group)
updateStatusNeeded = true
}
}
// Check if pod is ready
if m.Conditions.IsTrue(api.ConditionTypeReady) {
// Pod is now ready, so we're not looking further
continue
}
// Check not ready for a long time
if !m.Phase.IsFailed() {
if m.IsNotReadySince(time.Now().Add(-notReadySinceGracePeriod)) {
// Member has terminated too often in recent history.
failureAcceptable, reason := r.isMemberFailureAcceptable(group, m)
if failureAcceptable {
log.Info("Member is not ready for long time, marking is failed")
m.Phase = api.MemberPhaseFailed
status.Members.Update(m, group)
updateStatusNeeded = true
} else {
log.Warn("Member is not ready for long time, but it is not safe to mark it a failed because: %s", reason)
}
}
}
// Check recent terminations
if !m.Phase.IsFailed() {
count := m.RecentTerminationsSince(time.Now().Add(-recentTerminationsSinceGracePeriod))
if count >= recentTerminationThreshold {
// Member has terminated too often in recent history.
failureAcceptable, reason := r.isMemberFailureAcceptable(group, m)
if failureAcceptable {
log.Info("Member has terminated too often in recent history, marking is failed")
m.Phase = api.MemberPhaseFailed
status.Members.Update(m, group)
updateStatusNeeded = true
} else {
log.Warn("Member has terminated too often in recent history, but it is not safe to mark it a failed because: %s", reason)
}
} else {
log.Warn("Member is not ready for long time, but it is not safe to mark it a failed because: %s", reason)
}
}
}
return nil
}); err != nil {
return errors.WithStack(err)
// Check recent terminations
if !m.Phase.IsFailed() {
count := m.RecentTerminationsSince(time.Now().Add(-recentTerminationsSinceGracePeriod))
if count >= recentTerminationThreshold {
// Member has terminated too often in recent history.
failureAcceptable, reason := r.isMemberFailureAcceptable(group, m)
if failureAcceptable {
log.Info("Member has terminated too often in recent history, marking is failed")
m.Phase = api.MemberPhaseFailed
status.Members.Update(m, group)
updateStatusNeeded = true
} else {
log.Warn("Member has terminated too often in recent history, but it is not safe to mark it a failed because: %s", reason)
}
}
}
}
if updateStatusNeeded {
if err := r.context.UpdateStatus(ctx, status, lastVersion); err != nil {
return errors.WithStack(err)

View file

@ -82,31 +82,30 @@ func (r *Resources) syncMembersInCluster(ctx context.Context, health memberState
status, lastVersion := r.context.GetStatus()
updateStatusNeeded := false
status.Members.ForeachServerInGroups(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
log := log.Str("member", m.ID).Str("role", group.AsRole())
if serverFound(m.ID) {
// Member is (still) found, skip it
if m.Conditions.Update(api.ConditionTypeMemberOfCluster, true, "", "") {
if err := status.Members.Update(m, group); err != nil {
log.Err(err).Warn("Failed to update member")
}
updateStatusNeeded = true
log.Debug("Updating MemberOfCluster condition to true")
for _, e := range status.Members.AsListInGroups(api.ServerGroupCoordinators, api.ServerGroupDBServers) {
m := e.Member
group := e.Group
log := log.Str("member", m.ID).Str("role", group.AsRole())
if serverFound(m.ID) {
// Member is (still) found, skip it
if m.Conditions.Update(api.ConditionTypeMemberOfCluster, true, "", "") {
if err := status.Members.Update(m, group); err != nil {
log.Err(err).Warn("Failed to update member")
}
continue
} else if !m.Conditions.IsTrue(api.ConditionTypeMemberOfCluster) {
if m.Age() < minMemberAge {
log.Dur("age", m.Age()).Debug("Member is not yet recorded as member of cluster")
continue
}
log.Warn("Member can not be found in cluster")
} else {
log.Info("Member is no longer part of the ArangoDB cluster")
updateStatusNeeded = true
log.Debug("Updating MemberOfCluster condition to true")
}
continue
} else if !m.Conditions.IsTrue(api.ConditionTypeMemberOfCluster) {
if m.Age() < minMemberAge {
log.Dur("age", m.Age()).Debug("Member is not yet recorded as member of cluster")
continue
}
log.Warn("Member can not be found in cluster")
} else {
log.Info("Member is no longer part of the ArangoDB cluster")
}
return nil
}, api.ServerGroupCoordinators, api.ServerGroupDBServers)
}
if updateStatusNeeded {
log.Debug("UpdateStatus needed")
@ -125,58 +124,52 @@ func (r *Resources) EnsureArangoMembers(ctx context.Context, cachedStatus inspec
s, _ := r.context.GetStatus()
obj := r.context.GetAPIObject()
if err := s.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, member := range list {
name := member.ArangoMemberName(r.context.GetAPIObject().GetName(), group)
for _, e := range s.Members.AsList() {
name := e.Member.ArangoMemberName(r.context.GetAPIObject().GetName(), e.Group)
c := r.context.WithCurrentArangoMember(name)
c := r.context.WithCurrentArangoMember(name)
if !c.Exists(ctx) {
// Create ArangoMember
obj := &api.ArangoMember{
ObjectMeta: meta.ObjectMeta{
Name: name,
OwnerReferences: []meta.OwnerReference{
obj.AsOwner(),
},
if !c.Exists(ctx) {
// Create ArangoMember
obj := &api.ArangoMember{
ObjectMeta: meta.ObjectMeta{
Name: name,
OwnerReferences: []meta.OwnerReference{
obj.AsOwner(),
},
Spec: api.ArangoMemberSpec{
Group: group,
ID: member.ID,
DeploymentUID: obj.GetUID(),
},
}
},
Spec: api.ArangoMemberSpec{
Group: e.Group,
ID: e.Member.ID,
DeploymentUID: obj.GetUID(),
},
}
if err := r.context.WithCurrentArangoMember(name).Create(ctx, obj); err != nil {
return err
}
if err := r.context.WithCurrentArangoMember(name).Create(ctx, obj); err != nil {
return err
}
continue
} else {
if err := c.Update(ctx, func(m *api.ArangoMember) bool {
changed := false
if len(m.OwnerReferences) == 0 {
m.OwnerReferences = []meta.OwnerReference{
obj.AsOwner(),
}
changed = true
continue
} else {
if err := c.Update(ctx, func(m *api.ArangoMember) bool {
changed := false
if len(m.OwnerReferences) == 0 {
m.OwnerReferences = []meta.OwnerReference{
obj.AsOwner(),
}
if m.Spec.DeploymentUID == "" {
m.Spec.DeploymentUID = obj.GetUID()
changed = true
}
return changed
}); err != nil {
return err
changed = true
}
if m.Spec.DeploymentUID == "" {
m.Spec.DeploymentUID = obj.GetUID()
changed = true
}
return changed
}); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
if err := cachedStatus.ArangoMember().V1().Iterate(func(member *api.ArangoMember) error {

View file

@ -351,61 +351,60 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
}
// Go over all members, check for missing pods
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if podName := m.PodName; podName != "" {
if _, exists := cachedStatus.Pod().V1().GetSimple(podName); !exists {
log.Str("pod-name", podName).Debug("Does not exist")
switch m.Phase {
case api.MemberPhaseNone, api.MemberPhasePending:
// Do nothing
log.Str("pod-name", podName).Debug("PodPhase is None, waiting for the pod to be recreated")
case api.MemberPhaseShuttingDown, api.MemberPhaseUpgrading, api.MemberPhaseFailed, api.MemberPhaseRotateStart, api.MemberPhaseRotating:
// Shutdown was intended, so not need to do anything here.
// Just mark terminated
wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated)
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") {
if !wasTerminated {
// Record termination time
now := meta.Now()
m.RecentTerminations = append(m.RecentTerminations, now)
}
// Save it
if err := status.Members.Update(m, group); err != nil {
return errors.WithStack(err)
}
for _, e := range status.Members.AsList() {
m := e.Member
group := e.Group
if podName := m.PodName; podName != "" {
if _, exists := cachedStatus.Pod().V1().GetSimple(podName); !exists {
log.Str("pod-name", podName).Debug("Does not exist")
switch m.Phase {
case api.MemberPhaseNone, api.MemberPhasePending:
// Do nothing
log.Str("pod-name", podName).Debug("PodPhase is None, waiting for the pod to be recreated")
case api.MemberPhaseShuttingDown, api.MemberPhaseUpgrading, api.MemberPhaseFailed, api.MemberPhaseRotateStart, api.MemberPhaseRotating:
// Shutdown was intended, so not need to do anything here.
// Just mark terminated
wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated)
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") {
if !wasTerminated {
// Record termination time
now := meta.Now()
m.RecentTerminations = append(m.RecentTerminations, now)
}
default:
log.Str("pod-name", podName).Debug("Pod is gone")
m.Phase = api.MemberPhaseNone // This is trigger a recreate of the pod.
// Create event
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), apiObject))
updateMemberNeeded := false
if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") {
updateMemberNeeded = true
// Save it
if err := status.Members.Update(m, group); err != nil {
return 0, errors.WithStack(err)
}
wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated)
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Does Not Exist", "") {
if !wasTerminated {
// Record termination time
now := meta.Now()
m.RecentTerminations = append(m.RecentTerminations, now)
}
updateMemberNeeded = true
}
default:
log.Str("pod-name", podName).Debug("Pod is gone")
m.Phase = api.MemberPhaseNone // This is trigger a recreate of the pod.
// Create event
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), apiObject))
updateMemberNeeded := false
if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") {
updateMemberNeeded = true
}
wasTerminated := m.Conditions.IsTrue(api.ConditionTypeTerminated)
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Does Not Exist", "") {
if !wasTerminated {
// Record termination time
now := meta.Now()
m.RecentTerminations = append(m.RecentTerminations, now)
}
if updateMemberNeeded {
// Save it
if err := status.Members.Update(m, group); err != nil {
return errors.WithStack(err)
}
updateMemberNeeded = true
}
if updateMemberNeeded {
// Save it
if err := status.Members.Update(m, group); err != nil {
return 0, errors.WithStack(err)
}
}
}
}
}
return nil
})
}
spec := r.context.GetSpec()
allMembersReady := status.Members.AllMembersReady(spec.GetMode(), spec.Sync.IsEnabled())

View file

@ -61,65 +61,56 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
noLeader := len(leaderID) == 0
changed := false
group := api.ServerGroupAgents
agencyServers := func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName)
if !exist {
continue
}
labels := pod.GetLabels()
if noLeader || m.ID != leaderID {
// Unset a leader when:
// - leader is unknown.
// - leader does not belong to the current pod.
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; ok {
delete(labels, k8sutil.LabelKeyArangoLeader)
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
log.Err(err).Error("Unable to remove leader label")
return err
}
log.Warn("leader label is removed from \"%s\" member", m.ID)
changed = true
}
continue
}
// From here on it is known that there is a leader, and it should be attached to the current pod.
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else if value != "true" {
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
// A pod is already a leader, so nothing to change.
continue
}
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
log.Err(err).Error("Unable to update leader label")
return err
}
log.Warn("leader label is set on \"%s\" member", m.ID)
changed = true
for _, e := range status.Members.AsListInGroup(group) {
pod, exist := cachedStatus.Pod().V1().GetSimple(e.Member.PodName)
if !exist {
continue
}
return nil
}
labels := pod.GetLabels()
if noLeader || e.Member.ID != leaderID {
// Unset a leader when:
// - leader is unknown.
// - leader does not belong to the current pod.
if err := status.Members.ForeachServerInGroups(agencyServers, group); err != nil {
return err
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; ok {
delete(labels, k8sutil.LabelKeyArangoLeader)
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
log.Err(err).Error("Unable to remove leader label")
return err
}
log.Warn("leader label is removed from \"%s\" member", e.Member.ID)
changed = true
}
continue
}
// From here on it is known that there is a leader, and it should be attached to the current pod.
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else if value != "true" {
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
// A pod is already a leader, so nothing to change.
continue
}
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
log.Err(err).Error("Unable to update leader label")
return err
}
log.Warn("leader label is set on \"%s\" member", e.Member.ID)
changed = true
}
if changed {
return errors.Reconcile()
}
changed = false
if noLeader {
// There is no leader agency so service may not exist, or it can exist with empty list of endpoints.
@ -165,56 +156,42 @@ func (r *Resources) getSingleServerLeaderID(ctx context.Context) (string, error)
var leaderID string
var anyError error
dbServers := func(group api.ServerGroup, list api.MemberStatusList) error {
if len(list) == 0 {
return nil
}
ctxCancel, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()
// Fetch availability of each single server.
var wg sync.WaitGroup
wg.Add(len(list))
for _, m := range list {
go func(id string) {
defer wg.Done()
err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error {
c, err := r.context.GetMembersState().GetMemberClient(id)
if err != nil {
return err
}
if available, err := isServerAvailable(ctxChild, c); err != nil {
return err
} else if !available {
return errors.New("not available")
}
// Other requests can be interrupted, because a leader is known already.
cancel()
mutex.Lock()
leaderID = id
mutex.Unlock()
return nil
})
ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
for _, m := range status.Members.Single {
wg.Add(1)
go func(id string) {
defer wg.Done()
err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error {
c, err := r.context.GetMembersState().GetMemberClient(id)
if err != nil {
mutex.Lock()
anyError = err
mutex.Unlock()
return err
}
}(m.ID)
}
wg.Wait()
return nil
}
if available, err := isServerAvailable(ctxChild, c); err != nil {
return err
} else if !available {
return errors.New("not available")
}
if err := status.Members.ForeachServerInGroups(dbServers, api.ServerGroupSingle); err != nil {
return "", err
// Other requests can be interrupted, because a leader is known already.
cancel()
mutex.Lock()
leaderID = id
mutex.Unlock()
return nil
})
if err != nil {
mutex.Lock()
anyError = err
mutex.Unlock()
}
}(m.ID)
}
wg.Wait()
if len(leaderID) > 0 {
return leaderID, nil
@ -240,43 +217,35 @@ func (r *Resources) ensureSingleServerLeader(ctx context.Context, cachedStatus i
}
}
singleServers := func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName)
if !exist {
status, _ := r.context.GetStatus()
for _, m := range status.Members.Single {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName)
if !exist {
continue
}
labels := pod.GetLabels()
if enabled && m.ID == leaderID {
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; ok && value == "true" {
// Single server is available, and it has a leader label.
continue
}
labels := pod.GetLabels()
if enabled && m.ID == leaderID {
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; ok && value == "true" {
// Single server is available, and it has a leader label.
continue
}
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
// Single server is not available, and it does not have a leader label.
continue
}
delete(labels, k8sutil.LabelKeyArangoLeader)
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
// Single server is not available, and it does not have a leader label.
continue
}
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
return errors.WithMessagef(err, "unable to change leader label for pod %s", m.PodName)
}
changed = true
delete(labels, k8sutil.LabelKeyArangoLeader)
}
return nil
}
status, _ := r.context.GetStatus()
if err := status.Members.ForeachServerInGroups(singleServers, api.ServerGroupSingle); err != nil {
return err
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
return errors.WithMessagef(err, "unable to change leader label for pod %s", m.PodName)
}
changed = true
}
if changed {

View file

@ -131,55 +131,49 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
reconcileRequired := k8sutil.NewReconcile(cachedStatus)
// Ensure member services
if err := status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, e := range status.Members.AsList() {
var targetPort int32 = shared.ArangoPort
switch group {
switch e.Group {
case api.ServerGroupSyncMasters:
targetPort = shared.ArangoSyncMasterPort
case api.ServerGroupSyncWorkers:
targetPort = shared.ArangoSyncWorkerPort
}
for _, m := range list {
memberName := m.ArangoMemberName(r.context.GetAPIObject().GetName(), group)
memberName := e.Member.ArangoMemberName(r.context.GetAPIObject().GetName(), e.Group)
member, ok := cachedStatus.ArangoMember().V1().GetSimple(memberName)
if !ok {
return errors.Newf("Member %s not found", memberName)
}
selector := k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID)
if s, ok := cachedStatus.Service().V1().GetSimple(member.GetName()); !ok {
s := r.createService(member.GetName(), member.GetNamespace(), member.AsOwner(), targetPort, selector)
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := svcs.Create(ctxChild, s, meta.CreateOptions{})
return err
})
if err != nil {
if !k8sutil.IsConflict(err) {
return err
}
}
reconcileRequired.Required()
continue
} else {
if err, adjusted := r.adjustService(ctx, s, targetPort, selector); err == nil {
if adjusted {
reconcileRequired.Required()
}
// Continue the loop.
} else {
return err
}
}
member, ok := cachedStatus.ArangoMember().V1().GetSimple(memberName)
if !ok {
return errors.Newf("Member %s not found", memberName)
}
return nil
}); err != nil {
return err
selector := k8sutil.LabelsForMember(deploymentName, e.Group.AsRole(), e.Member.ID)
if s, ok := cachedStatus.Service().V1().GetSimple(member.GetName()); !ok {
s := r.createService(member.GetName(), member.GetNamespace(), member.AsOwner(), targetPort, selector)
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := svcs.Create(ctxChild, s, meta.CreateOptions{})
return err
})
if err != nil {
if !k8sutil.IsConflict(err) {
return err
}
}
reconcileRequired.Required()
continue
} else {
if err, adjusted := r.adjustService(ctx, s, targetPort, selector); err == nil {
if adjusted {
reconcileRequired.Required()
}
// Continue the loop.
} else {
return err
}
}
}
// Headless service

View file

@ -66,20 +66,17 @@ func (d *Deployment) StateColor() server.StateColor {
allGood = false
}
status, _ := d.GetStatus()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
switch m.Phase {
case api.MemberPhaseFailed:
failed = true
case api.MemberPhaseCreated:
// Should be ok now
default:
// Something is going on
allGood = true
}
for _, m := range status.Members.AsList() {
switch m.Member.Phase {
case api.MemberPhaseFailed:
failed = true
case api.MemberPhaseCreated:
// Should be ok now
default:
// Something is going on
allGood = true
}
return nil
})
}
if failed {
return server.StateRed
}
@ -94,34 +91,22 @@ func (d *Deployment) StateColor() server.StateColor {
// PodCount returns the number of pods for the deployment
func (d *Deployment) PodCount() int {
count := 0
status, _ := d.GetStatus()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.PodName != "" {
count++
}
}
return nil
})
return count
return len(status.Members.PodNames())
}
// ReadyPodCount returns the number of pods for the deployment that are in ready state
func (d *Deployment) ReadyPodCount() int {
count := 0
status, _ := d.GetStatus()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.PodName == "" {
continue
}
if m.Conditions.IsTrue(api.ConditionTypeReady) {
count++
}
for _, e := range status.Members.AsList() {
if e.Member.PodName == "" {
continue
}
return nil
})
if e.Member.Conditions.IsTrue(api.ConditionTypeReady) {
count++
}
}
return count
}
@ -129,14 +114,11 @@ func (d *Deployment) ReadyPodCount() int {
func (d *Deployment) VolumeCount() int {
count := 0
status, _ := d.GetStatus()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.PersistentVolumeClaimName != "" {
count++
}
for _, e := range status.Members.AsList() {
if e.Member.PersistentVolumeClaimName != "" {
count++
}
return nil
})
}
return count
}
@ -145,22 +127,19 @@ func (d *Deployment) ReadyVolumeCount() int {
count := 0
status, _ := d.GetStatus()
pvcs, _ := d.GetOwnedPVCs() // Ignore errors on purpose
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
if m.PersistentVolumeClaimName == "" {
continue
}
// Find status
for _, pvc := range pvcs {
if pvc.Name == m.PersistentVolumeClaimName {
if pvc.Status.Phase == core.ClaimBound {
count++
}
for _, e := range status.Members.AsList() {
if e.Member.PersistentVolumeClaimName == "" {
continue
}
// Find status
for _, pvc := range pvcs {
if pvc.Name == e.Member.PersistentVolumeClaimName {
if pvc.Status.Phase == core.ClaimBound {
count++
}
}
}
return nil
})
}
return count
}
@ -229,7 +208,9 @@ func (d *Deployment) DatabaseVersion() (string, string) {
func (d *Deployment) Members() map[api.ServerGroup][]server.Member {
result := make(map[api.ServerGroup][]server.Member)
status, _ := d.GetStatus()
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, group := range api.AllServerGroups {
list := status.Members.MembersOfGroup(group)
members := make([]server.Member, len(list))
for i, m := range list {
members[i] = member{
@ -241,7 +222,7 @@ func (d *Deployment) Members() map[api.ServerGroup][]server.Member {
if len(members) > 0 {
result[group] = members
}
return nil
})
}
return result
}