mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Bugfix] Add unreachable condition (#902)
This commit is contained in:
parent
58b2ff94c4
commit
b74beb59bb
15 changed files with 208 additions and 12 deletions
|
@ -7,6 +7,7 @@
|
|||
- Remove pod immediately when annotation is turned on
|
||||
- (ARM64) Add support for ARM64 enablement
|
||||
- (Cleanup) Reorganize main reconciliation context
|
||||
- (Bugfix) Unreachable condition
|
||||
|
||||
## [1.2.7](https://github.com/arangodb/kube-arangodb/tree/1.2.7) (2022-01-17)
|
||||
- Add Plan BackOff functionality
|
||||
|
|
|
@ -131,6 +131,7 @@ var (
|
|||
operatorTimeouts struct {
|
||||
k8s time.Duration
|
||||
arangoD time.Duration
|
||||
arangoDCheck time.Duration
|
||||
reconciliation time.Duration
|
||||
}
|
||||
chaosOptions struct {
|
||||
|
@ -170,6 +171,7 @@ func init() {
|
|||
f.StringVar(&operatorOptions.scope, "scope", scope.DefaultScope.String(), "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access")
|
||||
f.DurationVar(&operatorTimeouts.k8s, "timeout.k8s", globals.DefaultKubernetesTimeout, "The request timeout to the kubernetes")
|
||||
f.DurationVar(&operatorTimeouts.arangoD, "timeout.arangod", globals.DefaultArangoDTimeout, "The request timeout to the ArangoDB")
|
||||
f.DurationVar(&operatorTimeouts.arangoDCheck, "timeout.arangod-check", globals.DefaultArangoDCheckTimeout, "The version check request timeout to the ArangoDB")
|
||||
f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR")
|
||||
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", true, "Enable Scaling Integration")
|
||||
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
|
||||
|
@ -206,6 +208,7 @@ func executeMain(cmd *cobra.Command, args []string) {
|
|||
|
||||
globals.GetGlobalTimeouts().Kubernetes().Set(operatorTimeouts.k8s)
|
||||
globals.GetGlobalTimeouts().ArangoD().Set(operatorTimeouts.arangoD)
|
||||
globals.GetGlobalTimeouts().ArangoDCheck().Set(operatorTimeouts.arangoDCheck)
|
||||
globals.GetGlobalTimeouts().Reconciliation().Set(operatorTimeouts.reconciliation)
|
||||
globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize)
|
||||
globals.GetGlobals().Backup().ConcurrentUploads().Set(operatorBackup.concurrentUploads)
|
||||
|
|
|
@ -38,6 +38,8 @@ const (
|
|||
ConditionTypeReady ConditionType = "Ready"
|
||||
// ConditionTypeStarted indicates that the member was ready at least once.
|
||||
ConditionTypeStarted ConditionType = "Started"
|
||||
// ConditionTypeReachable indicates that the member is reachable.
|
||||
ConditionTypeReachable ConditionType = "Reachable"
|
||||
// ConditionTypeServing indicates that the member core services are running.
|
||||
ConditionTypeServing ConditionType = "Serving"
|
||||
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
|
||||
|
@ -158,6 +160,15 @@ func (list ConditionList) IsTrue(conditionType ConditionType) bool {
|
|||
return found && c.IsTrue()
|
||||
}
|
||||
|
||||
// GetValue returns *bool value in case if condition exists, nil otherwise
|
||||
func (list ConditionList) GetValue(conditionType ConditionType) *bool {
|
||||
c, found := list.Get(conditionType)
|
||||
if found {
|
||||
return util.NewBool(c.IsTrue())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get a condition by type.
|
||||
// Returns true if found, false if not found.
|
||||
func (list ConditionList) Get(conditionType ConditionType) (Condition, bool) {
|
||||
|
|
|
@ -20,13 +20,36 @@
|
|||
|
||||
package v1
|
||||
|
||||
import "sort"
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type DeploymentStatusMemberElementsSortFunc func(a, b DeploymentStatusMemberElement) bool
|
||||
type DeploymentStatusMemberElementsCondFunc func(a DeploymentStatusMemberElement) bool
|
||||
|
||||
type DeploymentStatusMemberElements []DeploymentStatusMemberElement
|
||||
|
||||
func (d DeploymentStatusMemberElements) ForEach(f func(id int)) {
|
||||
if f == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(len(d))
|
||||
|
||||
for id := range d {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
f(i)
|
||||
}(id)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (d DeploymentStatusMemberElements) Filter(f DeploymentStatusMemberElementsCondFunc) DeploymentStatusMemberElements {
|
||||
var l DeploymentStatusMemberElements
|
||||
|
||||
|
|
|
@ -38,6 +38,8 @@ const (
|
|||
ConditionTypeReady ConditionType = "Ready"
|
||||
// ConditionTypeStarted indicates that the member was ready at least once.
|
||||
ConditionTypeStarted ConditionType = "Started"
|
||||
// ConditionTypeReachable indicates that the member is reachable.
|
||||
ConditionTypeReachable ConditionType = "Reachable"
|
||||
// ConditionTypeServing indicates that the member core services are running.
|
||||
ConditionTypeServing ConditionType = "Serving"
|
||||
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
|
||||
|
@ -158,6 +160,15 @@ func (list ConditionList) IsTrue(conditionType ConditionType) bool {
|
|||
return found && c.IsTrue()
|
||||
}
|
||||
|
||||
// GetValue returns *bool value in case if condition exists, nil otherwise
|
||||
func (list ConditionList) GetValue(conditionType ConditionType) *bool {
|
||||
c, found := list.Get(conditionType)
|
||||
if found {
|
||||
return util.NewBool(c.IsTrue())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get a condition by type.
|
||||
// Returns true if found, false if not found.
|
||||
func (list ConditionList) Get(conditionType ConditionType) (Condition, bool) {
|
||||
|
|
|
@ -20,13 +20,36 @@
|
|||
|
||||
package v2alpha1
|
||||
|
||||
import "sort"
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type DeploymentStatusMemberElementsSortFunc func(a, b DeploymentStatusMemberElement) bool
|
||||
type DeploymentStatusMemberElementsCondFunc func(a DeploymentStatusMemberElement) bool
|
||||
|
||||
type DeploymentStatusMemberElements []DeploymentStatusMemberElement
|
||||
|
||||
func (d DeploymentStatusMemberElements) ForEach(f func(id int)) {
|
||||
if f == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
wg.Add(len(d))
|
||||
|
||||
for id := range d {
|
||||
go func(i int) {
|
||||
defer wg.Done()
|
||||
|
||||
f(i)
|
||||
}(id)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (d DeploymentStatusMemberElements) Filter(f DeploymentStatusMemberElementsCondFunc) DeploymentStatusMemberElements {
|
||||
var l DeploymentStatusMemberElements
|
||||
|
||||
|
|
|
@ -58,6 +58,7 @@ import (
|
|||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/chaos"
|
||||
memberState "github.com/arangodb/kube-arangodb/pkg/deployment/member"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/resilience"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
|
||||
|
@ -142,6 +143,8 @@ type Deployment struct {
|
|||
chaosMonkey *chaos.Monkey
|
||||
syncClientCache client.ClientCache
|
||||
haveServiceMonitorCRD bool
|
||||
|
||||
memberState.StateInspector
|
||||
}
|
||||
|
||||
func (d *Deployment) GetAgencyCache() (agency.State, bool) {
|
||||
|
@ -219,6 +222,8 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
|
|||
agencyCache: agency.NewCache(apiObject.Spec.Mode),
|
||||
}
|
||||
|
||||
d.StateInspector = memberState.NewStateInspector(d)
|
||||
|
||||
d.clientCache = deploymentClient.NewClientCache(d, conn.NewFactory(d.getAuth, d.getConnConfig))
|
||||
|
||||
d.status.last = *(apiObject.Status.DeepCopy())
|
||||
|
|
|
@ -115,6 +115,8 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
|
|||
|
||||
d.apiObject = updated
|
||||
|
||||
d.RefreshState(ctxReconciliation, updated.Status.Members.AsList())
|
||||
|
||||
inspectNextInterval, err := d.inspectDeploymentWithError(ctxReconciliation, nextInterval, cachedStatus)
|
||||
if err != nil {
|
||||
if !operatorErrors.IsReconcile(err) {
|
||||
|
|
|
@ -71,6 +71,7 @@ func removeMemberConditionsMapFunc(m *api.MemberStatus) {
|
|||
// Clean conditions
|
||||
m.Conditions.Remove(api.ConditionTypeReady)
|
||||
m.Conditions.Remove(api.ConditionTypeStarted)
|
||||
m.Conditions.Remove(api.ConditionTypeReachable)
|
||||
m.Conditions.Remove(api.ConditionTypeServing)
|
||||
m.Conditions.Remove(api.ConditionTypeTerminated)
|
||||
m.Conditions.Remove(api.ConditionTypeTerminating)
|
||||
|
|
99
pkg/deployment/member/state.go
Normal file
99
pkg/deployment/member/state.go
Normal file
|
@ -0,0 +1,99 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package member
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/globals"
|
||||
)
|
||||
|
||||
type StateInspector interface {
|
||||
RefreshState(ctx context.Context, members api.DeploymentStatusMemberElements)
|
||||
MemberState(id string) (State, bool)
|
||||
}
|
||||
|
||||
func NewStateInspector(client reconciler.DeploymentMemberClient) StateInspector {
|
||||
return &stateInspector{
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type stateInspector struct {
|
||||
lock sync.Mutex
|
||||
|
||||
members map[string]State
|
||||
|
||||
client reconciler.DeploymentMemberClient
|
||||
}
|
||||
|
||||
func (s *stateInspector) RefreshState(ctx context.Context, members api.DeploymentStatusMemberElements) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
results := make([]State, len(members))
|
||||
|
||||
nctx, cancel := globals.GetGlobalTimeouts().ArangoDCheck().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
members.ForEach(func(id int) {
|
||||
c, err := s.client.GetServerClient(nctx, members[id].Group, members[id].Member.ID)
|
||||
if err != nil {
|
||||
results[id].Reachable = false
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := c.Version(nctx); err != nil {
|
||||
results[id].Reachable = false
|
||||
return
|
||||
}
|
||||
|
||||
results[id].Reachable = true
|
||||
})
|
||||
|
||||
current := map[string]State{}
|
||||
|
||||
for id := range members {
|
||||
current[members[id].Member.ID] = results[id]
|
||||
}
|
||||
|
||||
s.members = current
|
||||
}
|
||||
|
||||
func (s *stateInspector) MemberState(id string) (State, bool) {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
if s.members == nil {
|
||||
return State{}, false
|
||||
}
|
||||
|
||||
v, ok := s.members[id]
|
||||
|
||||
return v, ok
|
||||
}
|
||||
|
||||
type State struct {
|
||||
Reachable bool
|
||||
}
|
|
@ -28,6 +28,7 @@ import (
|
|||
|
||||
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/member"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
|
||||
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
|
@ -50,6 +51,8 @@ type Context interface {
|
|||
reconciler.DeploymentSyncClient
|
||||
reconciler.KubernetesEventGenerator
|
||||
|
||||
member.StateInspector
|
||||
|
||||
// GetServerGroupIterator returns the deployment as ServerGroupIterator.
|
||||
GetServerGroupIterator() reconciler.ServerGroupIterator
|
||||
// UpdateStatus replaces the status of the deployment with the given status and
|
||||
|
|
|
@ -51,7 +51,8 @@ func (r *Resources) runPodFinalizers(ctx context.Context, p *v1.Pod, memberStatu
|
|||
|
||||
// When the main container is terminated, then the whole pod should be terminated,
|
||||
// so sidecar core containers' names should not be checked here.
|
||||
isServerContainerDead := !k8sutil.IsPodServerContainerRunning(p)
|
||||
// If Member is not reachable finalizers should be also removed
|
||||
isServerContainerDead := !k8sutil.IsPodServerContainerRunning(p) || util.BoolOrDefault(memberStatus.Conditions.GetValue(api.ConditionTypeReachable), true)
|
||||
|
||||
for _, f := range p.ObjectMeta.GetFinalizers() {
|
||||
switch f {
|
||||
|
|
|
@ -218,6 +218,20 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
|
|||
}
|
||||
// End of Topology labels
|
||||
|
||||
if state, ok := r.context.MemberState(memberStatus.ID); ok {
|
||||
if state.Reachable {
|
||||
if memberStatus.Conditions.Update(api.ConditionTypeReachable, true, "ArangoDB is reachable", "") {
|
||||
updateMemberStatusNeeded = true
|
||||
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
|
||||
}
|
||||
} else {
|
||||
if memberStatus.Conditions.Update(api.ConditionTypeReachable, false, "ArangoDB is not reachable", "") {
|
||||
updateMemberStatusNeeded = true
|
||||
nextInterval = nextInterval.ReduceTo(recheckSoonPodInspectorInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if k8sutil.IsPodReady(pod) && k8sutil.AreContainersReady(pod, coreContainers) {
|
||||
// Pod is now ready
|
||||
if anyOf(memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", ""),
|
||||
|
|
|
@ -25,6 +25,7 @@ import "time"
|
|||
const (
|
||||
DefaultKubernetesTimeout = 2 * time.Second
|
||||
DefaultArangoDTimeout = time.Second * 10
|
||||
DefaultArangoDCheckTimeout = time.Second * 2
|
||||
DefaultReconciliationTimeout = time.Minute
|
||||
|
||||
DefaultKubernetesRequestBatchSize = 256
|
||||
|
@ -36,6 +37,7 @@ var globalObj = &globals{
|
|||
timeouts: &globalTimeouts{
|
||||
requests: NewTimeout(DefaultKubernetesTimeout),
|
||||
arangod: NewTimeout(DefaultArangoDTimeout),
|
||||
arangodCheck: NewTimeout(DefaultArangoDCheckTimeout),
|
||||
reconciliation: NewTimeout(DefaultReconciliationTimeout),
|
||||
},
|
||||
kubernetes: &globalKubernetes{
|
||||
|
@ -107,10 +109,15 @@ type GlobalTimeouts interface {
|
|||
|
||||
Kubernetes() Timeout
|
||||
ArangoD() Timeout
|
||||
ArangoDCheck() Timeout
|
||||
}
|
||||
|
||||
type globalTimeouts struct {
|
||||
requests, arangod, reconciliation Timeout
|
||||
requests, arangod, reconciliation, arangodCheck Timeout
|
||||
}
|
||||
|
||||
func (g *globalTimeouts) ArangoDCheck() Timeout {
|
||||
return g.arangodCheck
|
||||
}
|
||||
|
||||
func (g *globalTimeouts) Reconciliation() Timeout {
|
||||
|
|
|
@ -45,10 +45,6 @@ func RemovePodFinalizers(ctx context.Context, cachedStatus pod.Inspector, log ze
|
|||
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := cachedStatus.Refresh(ctxChild); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, err := cachedStatus.PodReadInterface().Get(ctxChild, p.GetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
@ -80,10 +76,6 @@ func RemovePVCFinalizers(ctx context.Context, cachedStatus persistentvolumeclaim
|
|||
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
|
||||
defer cancel()
|
||||
|
||||
if err := cachedStatus.Refresh(ctxChild); err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
result, err := cachedStatus.PersistentVolumeClaimReadInterface().Get(ctxChild, p.GetName(), metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
|
|
Loading…
Reference in a new issue