mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Bugfix] Fix status propagation (#889)
This commit is contained in:
parent
6d3050cdef
commit
124d95d88c
12 changed files with 138 additions and 45 deletions
|
@ -16,6 +16,7 @@
|
|||
- Add ArangoClusterSynchronization Operator
|
||||
- Update licenses
|
||||
- Fix restart procedure in case of failing members
|
||||
- Fix status propagation race condition
|
||||
|
||||
## [1.2.6](https://github.com/arangodb/kube-arangodb/tree/1.2.6) (2021-12-15)
|
||||
- Add ArangoBackup backoff functionality
|
||||
|
|
|
@ -36,6 +36,10 @@ func (c ConditionType) String() string {
|
|||
const (
|
||||
// ConditionTypeReady indicates that the member or entire deployment is ready and running normally.
|
||||
ConditionTypeReady ConditionType = "Ready"
|
||||
// ConditionTypeStarted indicates that the member was ready at least once.
|
||||
ConditionTypeStarted ConditionType = "Started"
|
||||
// ConditionTypeServing indicates that the member core services are running.
|
||||
ConditionTypeServing ConditionType = "Serving"
|
||||
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
|
||||
ConditionTypeTerminated ConditionType = "Terminated"
|
||||
// ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once.
|
||||
|
|
|
@ -205,6 +205,22 @@ func (l MemberStatusList) MembersReady() int {
|
|||
return readyCount
|
||||
}
|
||||
|
||||
// MembersServing returns the number of members that are in the Serving state.
|
||||
func (l MemberStatusList) MembersServing() int {
|
||||
servingCount := 0
|
||||
for _, x := range l {
|
||||
if x.Conditions.IsTrue(ConditionTypeServing) {
|
||||
servingCount++
|
||||
}
|
||||
}
|
||||
return servingCount
|
||||
}
|
||||
|
||||
// AllMembersServing returns the true if all members are in the Serving state.
|
||||
func (l MemberStatusList) AllMembersServing() bool {
|
||||
return len(l) == l.MembersServing()
|
||||
}
|
||||
|
||||
// AllMembersReady returns the true if all members are in the Ready state.
|
||||
func (l MemberStatusList) AllMembersReady() bool {
|
||||
return len(l) == l.MembersReady()
|
||||
|
|
|
@ -36,6 +36,10 @@ func (c ConditionType) String() string {
|
|||
const (
|
||||
// ConditionTypeReady indicates that the member or entire deployment is ready and running normally.
|
||||
ConditionTypeReady ConditionType = "Ready"
|
||||
// ConditionTypeStarted indicates that the member was ready at least once.
|
||||
ConditionTypeStarted ConditionType = "Started"
|
||||
// ConditionTypeServing indicates that the member core services are running.
|
||||
ConditionTypeServing ConditionType = "Serving"
|
||||
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
|
||||
ConditionTypeTerminated ConditionType = "Terminated"
|
||||
// ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once.
|
||||
|
|
|
@ -205,6 +205,22 @@ func (l MemberStatusList) MembersReady() int {
|
|||
return readyCount
|
||||
}
|
||||
|
||||
// MembersServing returns the number of members that are in the Serving state.
|
||||
func (l MemberStatusList) MembersServing() int {
|
||||
servingCount := 0
|
||||
for _, x := range l {
|
||||
if x.Conditions.IsTrue(ConditionTypeServing) {
|
||||
servingCount++
|
||||
}
|
||||
}
|
||||
return servingCount
|
||||
}
|
||||
|
||||
// AllMembersServing returns the true if all members are in the Serving state.
|
||||
func (l MemberStatusList) AllMembersServing() bool {
|
||||
return len(l) == l.MembersServing()
|
||||
}
|
||||
|
||||
// AllMembersReady returns the true if all members are in the Ready state.
|
||||
func (l MemberStatusList) AllMembersReady() bool {
|
||||
return len(l) == l.MembersReady()
|
||||
|
|
|
@ -52,7 +52,7 @@ import (
|
|||
"github.com/arangodb/arangosync-client/client"
|
||||
"github.com/rs/zerolog"
|
||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/record"
|
||||
|
||||
|
@ -381,7 +381,7 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(ctx context.Context) err
|
|||
// Get the most recent version of the deployment from the API server
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), metav1.GetOptions{})
|
||||
current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), meta.GetOptions{})
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to get current version of deployment from API server")
|
||||
if k8sutil.IsNotFound(err) {
|
||||
|
@ -465,21 +465,22 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error {
|
|||
}
|
||||
|
||||
// Send update to API server
|
||||
ns := d.apiObject.GetNamespace()
|
||||
depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns)
|
||||
update := d.apiObject.DeepCopy()
|
||||
depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.GetNamespace())
|
||||
attempt := 0
|
||||
for {
|
||||
attempt++
|
||||
update.Status = d.status.last
|
||||
if update.GetDeletionTimestamp() == nil {
|
||||
ensureFinalizers(update)
|
||||
if d.apiObject.GetDeletionTimestamp() == nil {
|
||||
ensureFinalizers(d.apiObject)
|
||||
}
|
||||
|
||||
var newAPIObject *api.ArangoDeployment
|
||||
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
var err error
|
||||
newAPIObject, err = depls.Update(ctxChild, update, metav1.UpdateOptions{})
|
||||
p, err := patch.NewPatch(patch.ItemReplace(patch.NewPath("status"), d.status.last)).Marshal()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newAPIObject, err = depls.Patch(ctxChild, d.GetName(), types.JSONPatchType, p, meta.PatchOptions{})
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -488,21 +489,8 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error {
|
|||
d.apiObject = newAPIObject
|
||||
return nil
|
||||
}
|
||||
if attempt < 10 && k8sutil.IsConflict(err) {
|
||||
// API object may have been changed already,
|
||||
// Reload api object and try again
|
||||
var current *api.ArangoDeployment
|
||||
|
||||
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
var err error
|
||||
current, err = depls.Get(ctxChild, update.GetName(), metav1.GetOptions{})
|
||||
|
||||
return err
|
||||
})
|
||||
if err == nil {
|
||||
update = current.DeepCopy()
|
||||
continue
|
||||
}
|
||||
if attempt < 10 {
|
||||
continue
|
||||
}
|
||||
if err != nil {
|
||||
d.deps.Log.Debug().Err(err).Msg("failed to patch ArangoDeployment status")
|
||||
|
@ -535,7 +523,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe
|
|||
var newAPIObject *api.ArangoDeployment
|
||||
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
var err error
|
||||
newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, metav1.UpdateOptions{})
|
||||
newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, meta.UpdateOptions{})
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -551,7 +539,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe
|
|||
|
||||
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
var err error
|
||||
current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), metav1.GetOptions{})
|
||||
current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), meta.GetOptions{})
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -568,7 +556,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe
|
|||
}
|
||||
|
||||
// isOwnerOf returns true if the given object belong to this deployment.
|
||||
func (d *Deployment) isOwnerOf(obj metav1.Object) bool {
|
||||
func (d *Deployment) isOwnerOf(obj meta.Object) bool {
|
||||
ownerRefs := obj.GetOwnerReferences()
|
||||
if len(ownerRefs) < 1 {
|
||||
return false
|
||||
|
@ -583,9 +571,9 @@ func (d *Deployment) isOwnerOf(obj metav1.Object) bool {
|
|||
func (d *Deployment) lookForServiceMonitorCRD() {
|
||||
var err error
|
||||
if d.GetScope().IsNamespaced() {
|
||||
_, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), metav1.ListOptions{})
|
||||
_, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), meta.ListOptions{})
|
||||
} else {
|
||||
_, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", metav1.GetOptions{})
|
||||
_, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", meta.GetOptions{})
|
||||
}
|
||||
log := d.deps.Log
|
||||
log.Debug().Msgf("Looking for ServiceMonitor CRD...")
|
||||
|
@ -637,7 +625,7 @@ func (d *Deployment) ApplyPatch(ctx context.Context, p ...patch.Item) error {
|
|||
|
||||
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
depl, err := c.Patch(ctxChild, d.apiObject.GetName(), types.JSONPatchType, data, metav1.PatchOptions{})
|
||||
depl, err := c.Patch(ctxChild, d.apiObject.GetName(), types.JSONPatchType, data, meta.PatchOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ var phase = phaseMap{
|
|||
func removeMemberConditionsMapFunc(m *api.MemberStatus) {
|
||||
// Clean conditions
|
||||
m.Conditions.Remove(api.ConditionTypeReady)
|
||||
m.Conditions.Remove(api.ConditionTypeStarted)
|
||||
m.Conditions.Remove(api.ConditionTypeTerminated)
|
||||
m.Conditions.Remove(api.ConditionTypeTerminating)
|
||||
m.Conditions.Remove(api.ConditionTypeAgentRecoveryNeeded)
|
||||
|
|
|
@ -384,18 +384,23 @@ func groupReadyForRestart(context PlanBuilderContext, spec api.DeploymentSpec, s
|
|||
return true
|
||||
}
|
||||
|
||||
// If current member is not ready, kill anyway
|
||||
if !member.Conditions.IsTrue(api.ConditionTypeReady) {
|
||||
// If current member did not become ready even once. Kill it
|
||||
if !member.Conditions.IsTrue(api.ConditionTypeStarted) {
|
||||
return true
|
||||
}
|
||||
|
||||
// If current core containers are dead kill it.
|
||||
if !member.Conditions.IsTrue(api.ConditionTypeServing) {
|
||||
return true
|
||||
}
|
||||
|
||||
switch group {
|
||||
case api.ServerGroupDBServers:
|
||||
// TODO: Improve shard placement discovery and keep WriteConcern
|
||||
return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersReady()
|
||||
return context.GetShardSyncStatus() && status.Members.MembersOfGroup(group).AllMembersServing()
|
||||
default:
|
||||
// In case of agents we can kill only one agent at same time
|
||||
return status.Members.MembersOfGroup(group).AllMembersReady()
|
||||
return status.Members.MembersOfGroup(group).AllMembersServing()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -60,8 +60,8 @@ func emptyPlanBuilder(ctx context.Context,
|
|||
|
||||
func removeConditionActionV2(actionReason string, conditionType api.ConditionType) api.Action {
|
||||
return api.NewAction(api.ActionTypeSetConditionV2, api.ServerGroupUnknown, "", actionReason).
|
||||
AddParam(setConditionActionV2KeyAction, setConditionActionV2KeyTypeRemove).
|
||||
AddParam(setConditionActionV2KeyType, string(conditionType))
|
||||
AddParam(setConditionActionV2KeyAction, string(conditionType)).
|
||||
AddParam(setConditionActionV2KeyType, setConditionActionV2KeyTypeRemove)
|
||||
}
|
||||
|
||||
func updateConditionActionV2(actionReason string, conditionType api.ConditionType, status bool, reason, message, hash string) api.Action {
|
||||
|
|
|
@ -35,6 +35,8 @@ import (
|
|||
v1 "k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"strings"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/metrics"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util"
|
||||
|
@ -216,10 +218,12 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
|
|||
}
|
||||
// End of Topology labels
|
||||
|
||||
if k8sutil.AreContainersReady(pod, coreContainers) {
|
||||
if k8sutil.IsPodReady(pod) && k8sutil.AreContainersReady(pod, coreContainers) {
|
||||
// Pod is now ready
|
||||
if memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", "") {
|
||||
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready & Initialised to true")
|
||||
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", ""),
|
||||
memberStatus.Conditions.Update(api.ConditionTypeStarted, true, "Pod Started", ""),
|
||||
memberStatus.Conditions.Update(api.ConditionTypeServing, true, "Pod Serving", "")) {
|
||||
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready, Started & Serving to true")
|
||||
|
||||
if status.Topology.IsTopologyOwned(memberStatus.Topology) {
|
||||
nodes, ok := cachedStatus.GetNodes()
|
||||
|
@ -238,10 +242,19 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
|
|||
updateMemberStatusNeeded = true
|
||||
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
|
||||
}
|
||||
} else if k8sutil.AreContainersReady(pod, coreContainers) {
|
||||
// Pod is not ready, but core containers are fine
|
||||
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", ""),
|
||||
memberStatus.Conditions.Update(api.ConditionTypeServing, true, "Pod is still serving", "")) {
|
||||
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready to false, while all core containers are ready")
|
||||
updateMemberStatusNeeded = true
|
||||
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
|
||||
}
|
||||
} else {
|
||||
// Pod is not ready
|
||||
if memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", "") {
|
||||
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready to false")
|
||||
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, false, "Pod Not Ready", ""),
|
||||
memberStatus.Conditions.Update(api.ConditionTypeServing, false, "Pod Core containers are not ready", strings.Join(coreContainers, ", "))) {
|
||||
log.Debug().Str("pod-name", pod.GetName()).Msg("Updating member condition Ready & Serving to false")
|
||||
updateMemberStatusNeeded = true
|
||||
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
|
||||
}
|
||||
|
@ -394,3 +407,13 @@ func removeLabel(labels map[string]string, key string) map[string]string {
|
|||
|
||||
return labels
|
||||
}
|
||||
|
||||
func anyOf(bools ...bool) bool {
|
||||
for _, b := range bools {
|
||||
if b {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -120,13 +120,14 @@ func AreContainersReady(pod *core.Pod, coreContainers utils.StringList) bool {
|
|||
// From here on all required containers are running, but unready condition must be checked additionally.
|
||||
switch condition.Reason {
|
||||
case ServerContainerConditionContainersNotReady:
|
||||
if !strings.HasPrefix(condition.Message, ServerContainerConditionPrefix) {
|
||||
unreadyContainers, ok := extractContainerNamesFromConditionMessage(condition.Message)
|
||||
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
unreadyContainers := strings.TrimPrefix(condition.Message, ServerContainerConditionPrefix)
|
||||
for _, c := range coreContainers {
|
||||
if strings.Contains(unreadyContainers, c) {
|
||||
if unreadyContainers.Has(c) {
|
||||
// The container is on the list with unready containers.
|
||||
return false
|
||||
}
|
||||
|
@ -138,6 +139,28 @@ func AreContainersReady(pod *core.Pod, coreContainers utils.StringList) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func extractContainerNamesFromConditionMessage(msg string) (utils.StringList, bool) {
|
||||
if !strings.HasPrefix(msg, ServerContainerConditionPrefix) {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
unreadyContainers := strings.TrimPrefix(msg, ServerContainerConditionPrefix)
|
||||
|
||||
if !strings.HasPrefix(unreadyContainers, "[") {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(unreadyContainers, "]") {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
unreadyContainers = strings.TrimPrefix(strings.TrimSuffix(unreadyContainers, "]"), "[")
|
||||
|
||||
unreadyContainersList := utils.StringList(strings.Split(unreadyContainers, " "))
|
||||
|
||||
return unreadyContainersList, true
|
||||
}
|
||||
|
||||
// GetPodByName returns pod if it exists among the pods' list
|
||||
// Returns false if not found.
|
||||
func GetPodByName(pods []core.Pod, podName string) (core.Pod, bool) {
|
||||
|
|
|
@ -27,6 +27,7 @@ import (
|
|||
v1 "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestIsPodReady tests IsPodReady.
|
||||
|
@ -318,3 +319,14 @@ func TestIsPodSucceeded(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_extractContainerNamesFromConditionMessage(t *testing.T) {
|
||||
t.Run("Valid name", func(t *testing.T) {
|
||||
c, ok := extractContainerNamesFromConditionMessage("containers with unready status: [sidecar2 sidecar3]")
|
||||
require.True(t, ok)
|
||||
require.Len(t, c, 2)
|
||||
require.Contains(t, c, "sidecar2")
|
||||
require.Contains(t, c, "sidecar3")
|
||||
require.NotContains(t, c, "sidecar")
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue