1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Use v1 PDB in 1.21 kubernetes (#1006)

This commit is contained in:
Tomasz Mielech 2022-06-13 14:14:22 +02:00 committed by GitHub
parent 4405567deb
commit d8072eed7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 345 additions and 280 deletions

View file

@ -10,6 +10,7 @@
- (Feature) Add `ArangoJob` CRD auto-installer
- (Feature) Add RestartPolicyAlways to ArangoDeployment in order to restart ArangoDB on failure
- (Feature) Set a leader in active fail-over mode
- (Feature) Use policy/v1 instead policy/v1beta1
## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
- (Bugfix) Fix arangosync members state inspection

View file

@ -23,21 +23,21 @@ package resources
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/collection"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/collection"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog/log"
core "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
)
type PatchFunc func(name string, d []byte) error
@ -98,8 +98,15 @@ func (r *Resources) EnsureAnnotations(ctx context.Context, cachedStatus inspecto
patchPDB := func(name string, d []byte) error {
return globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
if _, err := cachedStatus.PodDisruptionBudget().V1(); err == nil {
_, err = cachedStatus.PodDisruptionBudgetsModInterface().V1().Patch(ctxChild, name,
types.JSONPatchType, d, meta.PatchOptions{})
return err
}
_, err := cachedStatus.PodDisruptionBudgetsModInterface().V1Beta1().Patch(ctxChild, name,
types.JSONPatchType, d, meta.PatchOptions{})
return err
})
}
@ -206,15 +213,29 @@ func ensureServicesAnnotations(patch PatchFunc, cachedStatus inspectorInterface.
return nil
}
func ensurePdbsAnnotations(patch PatchFunc, cachedStatus inspectorInterface.Inspector, kind, name, namespace string, spec api.DeploymentSpec) error {
i, err := cachedStatus.PodDisruptionBudget().V1Beta1()
func ensurePdbsAnnotations(patch PatchFunc, cachedStatus inspectorInterface.Inspector, kind, name, namespace string,
spec api.DeploymentSpec) error {
if inspector, err := cachedStatus.PodDisruptionBudget().V1(); err == nil {
if err := inspector.Iterate(func(podDisruptionBudget *policyv1.PodDisruptionBudget) error {
ensureAnnotationsMap(podDisruptionBudget.Kind, podDisruptionBudget, spec, patch)
return nil
}, func(podDisruptionBudget *policyv1.PodDisruptionBudget) bool {
return k8sutil.IsChildResource(kind, name, namespace, podDisruptionBudget)
}); err != nil {
return err
}
return nil
}
inspector, err := cachedStatus.PodDisruptionBudget().V1Beta1()
if err != nil {
return err
}
if err := i.Iterate(func(podDisruptionBudget *policy.PodDisruptionBudget) error {
if err := inspector.Iterate(func(podDisruptionBudget *policyv1beta1.PodDisruptionBudget) error {
ensureAnnotationsMap(podDisruptionBudget.Kind, podDisruptionBudget, spec, patch)
return nil
}, func(podDisruptionBudget *policy.PodDisruptionBudget) bool {
}, func(podDisruptionBudget *policyv1beta1.PodDisruptionBudget) bool {
return k8sutil.IsChildResource(kind, name, namespace, podDisruptionBudget)
}); err != nil {
return err

View file

@ -24,14 +24,15 @@ import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle"
)
func init() {
@ -50,7 +51,7 @@ func (p podDisruptionBudgetsInspectorLoader) Component() throttle.Component {
func (p podDisruptionBudgetsInspectorLoader) Load(ctx context.Context, i *inspectorState) {
var q podDisruptionBudgetsInspector
if i.versionInfo.Major() == 128 && i.versionInfo.Minor() >= 21 { // Above 1.21, disable temporally
if i.versionInfo.CompareTo("1.21") >= 0 {
p.loadV1(ctx, i, &q)
q.v1beta1 = &podDisruptionBudgetsInspectorV1Beta1{

View file

@ -21,8 +21,9 @@
package inspector
import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/anonymous"
"k8s.io/apimachinery/pkg/runtime/schema"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/anonymous"
)
func (p *podDisruptionBudgetsInspector) Anonymous(gvk schema.GroupVersionKind) (anonymous.Interface, bool) {

View file

@ -21,13 +21,13 @@
package inspector
import (
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// PodDisruptionBudget
const (
PodDisruptionBudgetGroup = policy.GroupName
PodDisruptionBudgetGroup = policyv1.GroupName
PodDisruptionBudgetResource = "poddisruptionbudgets"
PodDisruptionBudgetKind = "PodDisruptionBudget"
PodDisruptionBudgetVersionV1Beta1 = "v1beta1"

View file

@ -0,0 +1,83 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package inspector
import (
"context"
policyv1 "k8s.io/api/policy/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
policytypedv1 "k8s.io/client-go/kubernetes/typed/policy/v1"
poddisruptionbudgetv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1"
)
type podDisruptionBudgetsModV1 struct {
i *inspectorState
}
func (p podDisruptionBudgetsMod) V1() poddisruptionbudgetv1.ModInterface {
return podDisruptionBudgetsModV1(p)
}
func (p podDisruptionBudgetsModV1) client() policytypedv1.PodDisruptionBudgetInterface {
return p.i.Client().Kubernetes().PolicyV1().PodDisruptionBudgets(p.i.Namespace())
}
func (p podDisruptionBudgetsModV1) Create(ctx context.Context, podDisruptionBudget *policyv1.PodDisruptionBudget,
opts meta.CreateOptions) (*policyv1.PodDisruptionBudget, error) {
if podDisruptionBudget, err := p.client().Create(ctx, podDisruptionBudget, opts); err != nil {
return podDisruptionBudget, err
} else {
p.i.GetThrottles().PodDisruptionBudget().Invalidate()
return podDisruptionBudget, err
}
}
func (p podDisruptionBudgetsModV1) Update(ctx context.Context, podDisruptionBudget *policyv1.PodDisruptionBudget,
opts meta.UpdateOptions) (*policyv1.PodDisruptionBudget, error) {
if podDisruptionBudget, err := p.client().Update(ctx, podDisruptionBudget, opts); err != nil {
return podDisruptionBudget, err
} else {
p.i.GetThrottles().PodDisruptionBudget().Invalidate()
return podDisruptionBudget, err
}
}
func (p podDisruptionBudgetsModV1) Patch(ctx context.Context, name string, pt types.PatchType, data []byte,
opts meta.PatchOptions, subresources ...string) (result *policyv1.PodDisruptionBudget, err error) {
if podDisruptionBudget, err := p.client().Patch(ctx, name, pt, data, opts, subresources...); err != nil {
return podDisruptionBudget, err
} else {
p.i.GetThrottles().PodDisruptionBudget().Invalidate()
return podDisruptionBudget, err
}
}
func (p podDisruptionBudgetsModV1) Delete(ctx context.Context, name string, opts meta.DeleteOptions) error {
if err := p.client().Delete(ctx, name, opts); err != nil {
return err
} else {
p.i.GetThrottles().PodDisruptionBudget().Invalidate()
return err
}
}

View file

@ -23,26 +23,27 @@ package inspector
import (
"context"
podDisruptionBudgetv1beta1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1beta1"
policy "k8s.io/api/policy/v1beta1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
policyTyped "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
policytypedv1beta1 "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
podsisruptionbudgetv1beta1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1beta1"
)
func (p podDisruptionBudgetsMod) V1Beta1() podDisruptionBudgetv1beta1.ModInterface {
return podDisruptionBudgetsModV1(p)
func (p podDisruptionBudgetsMod) V1Beta1() podsisruptionbudgetv1beta1.ModInterface {
return podDisruptionBudgetsModV1Beta1(p)
}
type podDisruptionBudgetsModV1 struct {
type podDisruptionBudgetsModV1Beta1 struct {
i *inspectorState
}
func (p podDisruptionBudgetsModV1) client() policyTyped.PodDisruptionBudgetInterface {
func (p podDisruptionBudgetsModV1Beta1) client() policytypedv1beta1.PodDisruptionBudgetInterface {
return p.i.Client().Kubernetes().PolicyV1beta1().PodDisruptionBudgets(p.i.Namespace())
}
func (p podDisruptionBudgetsModV1) Create(ctx context.Context, podDisruptionBudget *policy.PodDisruptionBudget, opts meta.CreateOptions) (*policy.PodDisruptionBudget, error) {
func (p podDisruptionBudgetsModV1Beta1) Create(ctx context.Context, podDisruptionBudget *policyv1beta1.PodDisruptionBudget, opts meta.CreateOptions) (*policyv1beta1.PodDisruptionBudget, error) {
if podDisruptionBudget, err := p.client().Create(ctx, podDisruptionBudget, opts); err != nil {
return podDisruptionBudget, err
} else {
@ -51,7 +52,7 @@ func (p podDisruptionBudgetsModV1) Create(ctx context.Context, podDisruptionBudg
}
}
func (p podDisruptionBudgetsModV1) Update(ctx context.Context, podDisruptionBudget *policy.PodDisruptionBudget, opts meta.UpdateOptions) (*policy.PodDisruptionBudget, error) {
func (p podDisruptionBudgetsModV1Beta1) Update(ctx context.Context, podDisruptionBudget *policyv1beta1.PodDisruptionBudget, opts meta.UpdateOptions) (*policyv1beta1.PodDisruptionBudget, error) {
if podDisruptionBudget, err := p.client().Update(ctx, podDisruptionBudget, opts); err != nil {
return podDisruptionBudget, err
} else {
@ -60,7 +61,7 @@ func (p podDisruptionBudgetsModV1) Update(ctx context.Context, podDisruptionBudg
}
}
func (p podDisruptionBudgetsModV1) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *policy.PodDisruptionBudget, err error) {
func (p podDisruptionBudgetsModV1Beta1) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *policyv1beta1.PodDisruptionBudget, err error) {
if podDisruptionBudget, err := p.client().Patch(ctx, name, pt, data, opts, subresources...); err != nil {
return podDisruptionBudget, err
} else {
@ -69,7 +70,7 @@ func (p podDisruptionBudgetsModV1) Patch(ctx context.Context, name string, pt ty
}
}
func (p podDisruptionBudgetsModV1) Delete(ctx context.Context, name string, opts meta.DeleteOptions) error {
func (p podDisruptionBudgetsModV1Beta1) Delete(ctx context.Context, name string, opts meta.DeleteOptions) error {
if err := p.client().Delete(ctx, name, opts); err != nil {
return err
} else {

View file

@ -23,11 +23,12 @@ package inspector
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1"
)
func (p *podDisruptionBudgetsInspector) V1() (ins.Inspector, error) {
@ -41,7 +42,7 @@ func (p *podDisruptionBudgetsInspector) V1() (ins.Inspector, error) {
type podDisruptionBudgetsInspectorV1 struct {
podDisruptionBudgetInspector *podDisruptionBudgetsInspector
podDisruptionBudgets map[string]*policy.PodDisruptionBudget
podDisruptionBudgets map[string]*policyv1.PodDisruptionBudget
err error
}
@ -65,8 +66,8 @@ func (p *podDisruptionBudgetsInspectorV1) validate() error {
return nil
}
func (p *podDisruptionBudgetsInspectorV1) PodDisruptionBudgets() []*policy.PodDisruptionBudget {
var r []*policy.PodDisruptionBudget
func (p *podDisruptionBudgetsInspectorV1) PodDisruptionBudgets() []*policyv1.PodDisruptionBudget {
var r []*policyv1.PodDisruptionBudget
for _, podDisruptionBudget := range p.podDisruptionBudgets {
r = append(r, podDisruptionBudget)
}
@ -74,7 +75,7 @@ func (p *podDisruptionBudgetsInspectorV1) PodDisruptionBudgets() []*policy.PodDi
return r
}
func (p *podDisruptionBudgetsInspectorV1) GetSimple(name string) (*policy.PodDisruptionBudget, bool) {
func (p *podDisruptionBudgetsInspectorV1) GetSimple(name string) (*policyv1.PodDisruptionBudget, bool) {
podDisruptionBudget, ok := p.podDisruptionBudgets[name]
if !ok {
return nil, false
@ -93,7 +94,7 @@ func (p *podDisruptionBudgetsInspectorV1) Iterate(action ins.Action, filters ...
return nil
}
func (p *podDisruptionBudgetsInspectorV1) iteratePodDisruptionBudget(podDisruptionBudget *policy.PodDisruptionBudget, action ins.Action, filters ...ins.Filter) error {
func (p *podDisruptionBudgetsInspectorV1) iteratePodDisruptionBudget(podDisruptionBudget *policyv1.PodDisruptionBudget, action ins.Action, filters ...ins.Filter) error {
for _, f := range filters {
if f == nil {
continue
@ -111,7 +112,7 @@ func (p *podDisruptionBudgetsInspectorV1) Read() ins.ReadInterface {
return p
}
func (p *podDisruptionBudgetsInspectorV1) Get(ctx context.Context, name string, opts meta.GetOptions) (*policy.PodDisruptionBudget, error) {
func (p *podDisruptionBudgetsInspectorV1) Get(ctx context.Context, name string, opts meta.GetOptions) (*policyv1.PodDisruptionBudget, error) {
if s, ok := p.GetSimple(name); !ok {
return nil, apiErrors.NewNotFound(PodDisruptionBudgetGR(), name)
} else {

View file

@ -23,11 +23,12 @@ package inspector
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1beta1"
policy "k8s.io/api/policy/v1beta1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1beta1"
)
func (p *podDisruptionBudgetsInspector) V1Beta1() (ins.Inspector, error) {
@ -41,7 +42,7 @@ func (p *podDisruptionBudgetsInspector) V1Beta1() (ins.Inspector, error) {
type podDisruptionBudgetsInspectorV1Beta1 struct {
podDisruptionBudgetInspector *podDisruptionBudgetsInspector
podDisruptionBudgets map[string]*policy.PodDisruptionBudget
podDisruptionBudgets map[string]*policyv1beta1.PodDisruptionBudget
err error
}
@ -65,8 +66,8 @@ func (p *podDisruptionBudgetsInspectorV1Beta1) validate() error {
return nil
}
func (p *podDisruptionBudgetsInspectorV1Beta1) PodDisruptionBudgets() []*policy.PodDisruptionBudget {
var r []*policy.PodDisruptionBudget
func (p *podDisruptionBudgetsInspectorV1Beta1) PodDisruptionBudgets() []*policyv1beta1.PodDisruptionBudget {
var r []*policyv1beta1.PodDisruptionBudget
for _, podDisruptionBudget := range p.podDisruptionBudgets {
r = append(r, podDisruptionBudget)
}
@ -74,7 +75,7 @@ func (p *podDisruptionBudgetsInspectorV1Beta1) PodDisruptionBudgets() []*policy.
return r
}
func (p *podDisruptionBudgetsInspectorV1Beta1) GetSimple(name string) (*policy.PodDisruptionBudget, bool) {
func (p *podDisruptionBudgetsInspectorV1Beta1) GetSimple(name string) (*policyv1beta1.PodDisruptionBudget, bool) {
podDisruptionBudget, ok := p.podDisruptionBudgets[name]
if !ok {
return nil, false
@ -93,7 +94,7 @@ func (p *podDisruptionBudgetsInspectorV1Beta1) Iterate(action ins.Action, filter
return nil
}
func (p *podDisruptionBudgetsInspectorV1Beta1) iteratePodDisruptionBudget(podDisruptionBudget *policy.PodDisruptionBudget, action ins.Action, filters ...ins.Filter) error {
func (p *podDisruptionBudgetsInspectorV1Beta1) iteratePodDisruptionBudget(podDisruptionBudget *policyv1beta1.PodDisruptionBudget, action ins.Action, filters ...ins.Filter) error {
for _, f := range filters {
if f == nil {
continue
@ -111,7 +112,7 @@ func (p *podDisruptionBudgetsInspectorV1Beta1) Read() ins.ReadInterface {
return p
}
func (p *podDisruptionBudgetsInspectorV1Beta1) Get(ctx context.Context, name string, opts meta.GetOptions) (*policy.PodDisruptionBudget, error) {
func (p *podDisruptionBudgetsInspectorV1Beta1) Get(ctx context.Context, name string, opts meta.GetOptions) (*policyv1beta1.PodDisruptionBudget, error) {
if s, ok := p.GetSimple(name); !ok {
return nil, apiErrors.NewNotFound(PodDisruptionBudgetGR(), name)
} else {

View file

@ -25,14 +25,16 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/globals"
monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
core "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
core "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
func (r *Resources) EnsureLabels(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
@ -240,14 +242,13 @@ func (r *Resources) EnsurePersistentVolumeClaimsLabels(ctx context.Context, cach
func (r *Resources) EnsurePodDisruptionBudgetsLabels(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
changed := false
i, err := cachedStatus.PodDisruptionBudget().V1Beta1()
if err != nil {
return err
}
if err := i.Iterate(func(budget *policy.PodDisruptionBudget) error {
if inspector, err := cachedStatus.PodDisruptionBudget().V1(); err == nil {
if err := inspector.Iterate(func(budget *policyv1.PodDisruptionBudget) error {
if ensureLabelsMap(budget.Kind, budget, r.context.GetSpec(), func(name string, d []byte) error {
return globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := cachedStatus.PodDisruptionBudgetsModInterface().V1Beta1().Patch(ctxChild, name, types.JSONPatchType, d, meta.PatchOptions{})
_, err := cachedStatus.PodDisruptionBudgetsModInterface().V1().Patch(ctxChild, name,
types.JSONPatchType, d, meta.PatchOptions{})
return err
})
}) {
@ -255,11 +256,34 @@ func (r *Resources) EnsurePodDisruptionBudgetsLabels(ctx context.Context, cached
}
return nil
}, func(budget *policy.PodDisruptionBudget) bool {
}, func(budget *policyv1.PodDisruptionBudget) bool {
return r.isChildResource(budget)
}); err != nil {
return err
}
} else {
inspector, err := cachedStatus.PodDisruptionBudget().V1Beta1()
if err != nil {
return err
}
if err := inspector.Iterate(func(budget *policyv1beta1.PodDisruptionBudget) error {
if ensureLabelsMap(budget.Kind, budget, r.context.GetSpec(), func(name string, d []byte) error {
return globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := cachedStatus.PodDisruptionBudgetsModInterface().V1Beta1().Patch(ctxChild, name,
types.JSONPatchType, d, meta.PatchOptions{})
return err
})
}) {
changed = true
}
return nil
}, func(budget *policyv1beta1.PodDisruptionBudget) bool {
return r.isChildResource(budget)
}); err != nil {
return err
}
}
if changed {
return errors.Reconcile()

View file

@ -25,15 +25,16 @@ import (
"fmt"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
func min(a int, b int) int {
@ -89,7 +90,7 @@ func PDBNameForGroup(depl string, group api.ServerGroup) string {
return fmt.Sprintf("%s-%s-pdb", depl, group.AsRole())
}
func newPDB(minAvail int, deplname string, group api.ServerGroup, owner meta.OwnerReference) *policyv1beta1.PodDisruptionBudget {
func newPDBV1Beta1(minAvail int, deplname string, group api.ServerGroup, owner meta.OwnerReference) *policyv1beta1.PodDisruptionBudget {
return &policyv1beta1.PodDisruptionBudget{
ObjectMeta: meta.ObjectMeta{
Name: PDBNameForGroup(deplname, group),
@ -104,56 +105,105 @@ func newPDB(minAvail int, deplname string, group api.ServerGroup, owner meta.Own
}
}
func newPDBV1(minAvail int, deplname string, group api.ServerGroup, owner meta.OwnerReference) *policyv1.PodDisruptionBudget {
return &policyv1.PodDisruptionBudget{
ObjectMeta: meta.ObjectMeta{
Name: PDBNameForGroup(deplname, group),
OwnerReferences: []meta.OwnerReference{owner},
},
Spec: policyv1.PodDisruptionBudgetSpec{
MinAvailable: newFromInt(minAvail),
Selector: &meta.LabelSelector{
MatchLabels: k8sutil.LabelsForDeployment(deplname, group.AsRole()),
},
},
}
}
// ensurePDBForGroup ensure pdb for a specific server group, if wantMinAvail is zero, the PDB is removed and not recreated
func (r *Resources) ensurePDBForGroup(ctx context.Context, group api.ServerGroup, wantedMinAvail int) error {
i, err := r.context.ACS().CurrentClusterCache().PodDisruptionBudget().V1Beta1()
if err != nil {
return err
}
deplname := r.context.GetAPIObject().GetName()
pdbname := PDBNameForGroup(deplname, group)
deplName := r.context.GetAPIObject().GetName()
pdbName := PDBNameForGroup(deplName, group)
log := r.log.With().Str("group", group.AsRole()).Logger()
pdbMod := r.context.ACS().CurrentClusterCache().PodDisruptionBudgetsModInterface()
for {
var pdb *policyv1beta1.PodDisruptionBudget
var minAvailable *intstr.IntOrString
var deletionTimestamp *meta.Time
var isV1 bool
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
var err error
pdb, err = i.Read().Get(ctxChild, pdbname, meta.GetOptions{})
if inspector, err := r.context.ACS().CurrentClusterCache().PodDisruptionBudget().V1(); err == nil {
if pdb, err := inspector.Read().Get(ctxChild, pdbName, meta.GetOptions{}); err != nil {
return err
} else {
isV1 = true
minAvailable = pdb.Spec.MinAvailable
deletionTimestamp = pdb.GetDeletionTimestamp()
}
} else if inspector, err := r.context.ACS().CurrentClusterCache().PodDisruptionBudget().V1Beta1(); err == nil {
if pdb, err := inspector.Read().Get(ctxChild, pdbName, meta.GetOptions{}); err != nil {
return err
} else {
minAvailable = pdb.Spec.MinAvailable
deletionTimestamp = pdb.GetDeletionTimestamp()
}
} else {
return errors.WithStack(err)
}
return nil
})
if k8sutil.IsNotFound(err) {
if wantedMinAvail != 0 {
// No PDB found - create new
pdb := newPDB(wantedMinAvail, deplname, group, r.context.GetAPIObject().AsOwner())
// No PDB found - create new.
log.Debug().Msg("Creating new PDB")
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := r.context.ACS().CurrentClusterCache().PodDisruptionBudgetsModInterface().V1Beta1().Create(ctxChild, pdb, meta.CreateOptions{})
return err
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
var errInternal error
if isV1 {
pdb := newPDBV1(wantedMinAvail, deplName, group, r.context.GetAPIObject().AsOwner())
_, errInternal = pdbMod.V1().Create(ctxChild, pdb, meta.CreateOptions{})
} else {
pdb := newPDBV1Beta1(wantedMinAvail, deplName, group, r.context.GetAPIObject().AsOwner())
_, errInternal = pdbMod.V1Beta1().Create(ctxChild, pdb, meta.CreateOptions{})
}
return errInternal
})
if err != nil {
log.Error().Err(err).Msg("failed to create PDB")
return errors.WithStack(err)
}
}
return nil
} else if err == nil {
// PDB is there
if pdb.Spec.MinAvailable.IntValue() == wantedMinAvail && wantedMinAvail != 0 {
} else if err != nil {
// Some other error than not found.
return errors.WithStack(err)
}
// PDB v1 or v1beta1 is here.
if minAvailable.IntValue() == wantedMinAvail && wantedMinAvail != 0 {
return nil
}
// Update for PDBs is forbidden, thus one has to delete it and then create it again
// Otherwise delete it if wantedMinAvail is zero
log.Debug().Int("wanted-min-avail", wantedMinAvail).
Int("current-min-avail", pdb.Spec.MinAvailable.IntValue()).
Int("current-min-avail", minAvailable.IntValue()).
Msg("Recreating PDB")
pdb.Spec.MinAvailable = newFromInt(wantedMinAvail)
// Trigger deletion only if not already deleted
if pdb.GetDeletionTimestamp() == nil {
// Update the PDB
// Trigger deletion only if not already deleted.
if deletionTimestamp == nil {
// Update the PDB.
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return r.context.ACS().CurrentClusterCache().PodDisruptionBudgetsModInterface().V1Beta1().Delete(ctxChild, pdbname, meta.DeleteOptions{})
if isV1 {
return pdbMod.V1().Delete(ctxChild, pdbName, meta.DeleteOptions{})
}
return pdbMod.V1Beta1().Delete(ctxChild, pdbName, meta.DeleteOptions{})
})
if err != nil && !k8sutil.IsNotFound(err) {
log.Error().Err(err).Msg("PDB deletion failed")
@ -166,9 +216,7 @@ func (r *Resources) ensurePDBForGroup(ctx context.Context, group api.ServerGroup
if wantedMinAvail == 0 {
return nil
}
} else {
return errors.WithStack(err)
}
log.Debug().Msg("Retry loop for PDB")
select {
case <-ctx.Done():

View file

@ -21,13 +21,7 @@
package k8sutil
import (
"context"
core "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
typedCore "k8s.io/client-go/kubernetes/typed/core/v1"
policyTyped "k8s.io/client-go/kubernetes/typed/policy/v1beta1"
)
type OwnerRefObj interface {
@ -77,129 +71,3 @@ func IsChildResource(kind, name, namespace string, resource meta.Object) bool {
return false
}
func GetSecretsForParent(client typedCore.SecretInterface, kind, name, namespace string) ([]*core.Secret, error) {
secrets, err := client.List(context.Background(), meta.ListOptions{})
if err != nil {
return nil, err
}
if len(secrets.Items) == 0 {
return []*core.Secret{}, nil
}
childSecrets := make([]*core.Secret, 0, len(secrets.Items))
for _, secret := range secrets.Items {
if IsChildResource(kind, name, namespace, &secret) {
childSecrets = append(childSecrets, secret.DeepCopy())
}
}
return childSecrets, nil
}
func GetPDBForParent(client policyTyped.PodDisruptionBudgetInterface, kind, name, namespace string) ([]*policy.PodDisruptionBudget, error) {
pdbs, err := client.List(context.Background(), meta.ListOptions{})
if err != nil {
return nil, err
}
if len(pdbs.Items) == 0 {
return []*policy.PodDisruptionBudget{}, nil
}
childPdbs := make([]*policy.PodDisruptionBudget, 0, len(pdbs.Items))
for _, pdb := range pdbs.Items {
if IsChildResource(kind, name, namespace, &pdb) {
childPdbs = append(childPdbs, pdb.DeepCopy())
}
}
return childPdbs, nil
}
func GetPVCForParent(client typedCore.PersistentVolumeClaimInterface, kind, name, namespace string) ([]*core.PersistentVolumeClaim, error) {
pvcs, err := client.List(context.Background(), meta.ListOptions{})
if err != nil {
return nil, err
}
if len(pvcs.Items) == 0 {
return []*core.PersistentVolumeClaim{}, nil
}
childPvcs := make([]*core.PersistentVolumeClaim, 0, len(pvcs.Items))
for _, pvc := range pvcs.Items {
if IsChildResource(kind, name, namespace, &pvc) {
childPvcs = append(childPvcs, pvc.DeepCopy())
}
}
return childPvcs, nil
}
func GetServicesForParent(client typedCore.ServiceInterface, kind, name, namespace string) ([]*core.Service, error) {
services, err := client.List(context.Background(), meta.ListOptions{})
if err != nil {
return nil, err
}
if len(services.Items) == 0 {
return []*core.Service{}, nil
}
childServices := make([]*core.Service, 0, len(services.Items))
for _, service := range services.Items {
if IsChildResource(kind, name, namespace, &service) {
childServices = append(childServices, service.DeepCopy())
}
}
return childServices, nil
}
func GetServiceAccountsForParent(client typedCore.ServiceAccountInterface, kind, name, namespace string) ([]*core.ServiceAccount, error) {
serviceAccounts, err := client.List(context.Background(), meta.ListOptions{})
if err != nil {
return nil, err
}
if len(serviceAccounts.Items) == 0 {
return []*core.ServiceAccount{}, nil
}
childServiceAccounts := make([]*core.ServiceAccount, 0, len(serviceAccounts.Items))
for _, serviceAccount := range serviceAccounts.Items {
if IsChildResource(kind, name, namespace, &serviceAccount) {
childServiceAccounts = append(childServiceAccounts, serviceAccount.DeepCopy())
}
}
return childServiceAccounts, nil
}
func GetPodsForParent(client typedCore.PodInterface, kind, name, namespace string) ([]*core.Pod, error) {
podList, err := client.List(context.Background(), meta.ListOptions{})
if err != nil {
return nil, err
}
if len(podList.Items) == 0 {
return []*core.Pod{}, nil
}
pods := make([]*core.Pod, 0, len(podList.Items))
for _, pod := range podList.Items {
if IsChildResource(kind, name, namespace, &pod) {
pods = append(pods, pod.DeepCopy())
}
}
return pods, nil
}

View file

@ -24,6 +24,7 @@ import (
endpointsv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints/v1"
persistentvolumeclaimv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim/v1"
podv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod/v1"
poddisruptionbudgetv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget/v1beta1"
secretv1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/secret/v1"
servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1"
@ -60,6 +61,7 @@ type ServiceMonitorsMods interface {
}
type PodDisruptionBudgetsMods interface {
V1() poddisruptionbudgetv1.ModInterface
V1Beta1() v1beta1.ModInterface
}

View file

@ -18,20 +18,21 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1beta1
package v1
import (
policyv1 "k8s.io/api/policy/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/gvk"
policy "k8s.io/api/policy/v1"
)
type Inspector interface {
gvk.GVK
GetSimple(name string) (*policy.PodDisruptionBudget, bool)
GetSimple(name string) (*policyv1.PodDisruptionBudget, bool)
Iterate(action Action, filters ...Filter) error
Read() ReadInterface
}
type Filter func(podDisruptionBudget *policy.PodDisruptionBudget) bool
type Action func(podDisruptionBudget *policy.PodDisruptionBudget) error
type Filter func(podDisruptionBudget *policyv1.PodDisruptionBudget) bool
type Action func(podDisruptionBudget *policyv1.PodDisruptionBudget) error

View file

@ -18,21 +18,21 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1beta1
package v1
import (
"context"
policy "k8s.io/api/policy/v1"
policyv1 "k8s.io/api/policy/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
// ModInterface has methods to work with PodDisruptionBudget resources only for creation
type ModInterface interface {
Create(ctx context.Context, poddisruptionbudget *policy.PodDisruptionBudget, opts meta.CreateOptions) (*policy.PodDisruptionBudget, error)
Update(ctx context.Context, poddisruptionbudget *policy.PodDisruptionBudget, opts meta.UpdateOptions) (*policy.PodDisruptionBudget, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *policy.PodDisruptionBudget, err error)
Create(ctx context.Context, poddisruptionbudget *policyv1.PodDisruptionBudget, opts meta.CreateOptions) (*policyv1.PodDisruptionBudget, error)
Update(ctx context.Context, poddisruptionbudget *policyv1.PodDisruptionBudget, opts meta.UpdateOptions) (*policyv1.PodDisruptionBudget, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *policyv1.PodDisruptionBudget, err error)
Delete(ctx context.Context, name string, opts meta.DeleteOptions) error
}
@ -44,5 +44,5 @@ type Interface interface {
// ReadInterface has methods to work with PodDisruptionBudget resources with ReadOnly mode.
type ReadInterface interface {
Get(ctx context.Context, name string, opts meta.GetOptions) (*policy.PodDisruptionBudget, error)
Get(ctx context.Context, name string, opts meta.GetOptions) (*policyv1.PodDisruptionBudget, error)
}

View file

@ -21,17 +21,18 @@
package v1beta1
import (
policyv1beta1 "k8s.io/api/policy/v1beta1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/gvk"
policy "k8s.io/api/policy/v1beta1"
)
type Inspector interface {
gvk.GVK
GetSimple(name string) (*policy.PodDisruptionBudget, bool)
GetSimple(name string) (*policyv1beta1.PodDisruptionBudget, bool)
Iterate(action Action, filters ...Filter) error
Read() ReadInterface
}
type Filter func(podDisruptionBudget *policy.PodDisruptionBudget) bool
type Action func(podDisruptionBudget *policy.PodDisruptionBudget) error
type Filter func(podDisruptionBudget *policyv1beta1.PodDisruptionBudget) bool
type Action func(podDisruptionBudget *policyv1beta1.PodDisruptionBudget) error

View file

@ -23,16 +23,16 @@ package v1beta1
import (
"context"
policy "k8s.io/api/policy/v1beta1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
// ModInterface has methods to work with PodDisruptionBudget resources only for creation
type ModInterface interface {
Create(ctx context.Context, poddisruptionbudget *policy.PodDisruptionBudget, opts meta.CreateOptions) (*policy.PodDisruptionBudget, error)
Update(ctx context.Context, poddisruptionbudget *policy.PodDisruptionBudget, opts meta.UpdateOptions) (*policy.PodDisruptionBudget, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *policy.PodDisruptionBudget, err error)
Create(ctx context.Context, poddisruptionbudget *policyv1beta1.PodDisruptionBudget, opts meta.CreateOptions) (*policyv1beta1.PodDisruptionBudget, error)
Update(ctx context.Context, poddisruptionbudget *policyv1beta1.PodDisruptionBudget, opts meta.UpdateOptions) (*policyv1beta1.PodDisruptionBudget, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *policyv1beta1.PodDisruptionBudget, err error)
Delete(ctx context.Context, name string, opts meta.DeleteOptions) error
}
@ -44,5 +44,5 @@ type Interface interface {
// ReadInterface has methods to work with PodDisruptionBudget resources with ReadOnly mode.
type ReadInterface interface {
Get(ctx context.Context, name string, opts meta.GetOptions) (*policy.PodDisruptionBudget, error)
Get(ctx context.Context, name string, opts meta.GetOptions) (*policyv1beta1.PodDisruptionBudget, error)
}

View file

@ -23,16 +23,18 @@ package kclient
import (
"sync"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
versionedFake "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/fake"
monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
monitoringFake "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/fake"
core "k8s.io/api/core/v1"
policy "k8s.io/api/policy/v1beta1"
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
apiextensionsclientFake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubernetesFake "k8s.io/client-go/kubernetes/fake"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
versionedFake "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/fake"
)
func NewFakeClient() Client {
@ -101,7 +103,8 @@ type FakeDataInput struct {
Services map[string]*core.Service
PVCS map[string]*core.PersistentVolumeClaim
ServiceAccounts map[string]*core.ServiceAccount
PDBS map[string]*policy.PodDisruptionBudget
PDBSV1Beta1 map[string]*policyv1beta1.PodDisruptionBudget
PDBSV1 map[string]*policyv1.PodDisruptionBudget
ServiceMonitors map[string]*monitoring.ServiceMonitor
ArangoMembers map[string]*api.ArangoMember
Nodes map[string]*core.Node
@ -152,7 +155,15 @@ func (f FakeDataInput) asList() []runtime.Object {
}
r = append(r, c)
}
for k, v := range f.PDBS {
for k, v := range f.PDBSV1Beta1 {
c := v.DeepCopy()
c.SetName(k)
if c.GetNamespace() == "" && f.Namespace != "" {
c.SetNamespace(f.Namespace)
}
r = append(r, c)
}
for k, v := range f.PDBSV1 {
c := v.DeepCopy()
c.SetName(k)
if c.GetNamespace() == "" && f.Namespace != "" {