mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Improvement] Use stanradized k8s object handlers (#1659)
This commit is contained in:
parent
ccdf309abf
commit
df737355d4
8 changed files with 462 additions and 161 deletions
|
@ -44,7 +44,7 @@ linters-settings:
|
|||
- pkg: github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1beta1
|
||||
alias: schedulerApi
|
||||
- pkg: github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1beta1/profiles
|
||||
alias: schedulerProfilesv1beta1
|
||||
alias: schedulerProfiles
|
||||
- pkg: github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1beta1/container
|
||||
alias: schedulerContainerApi
|
||||
- pkg: github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1beta1/container/resources
|
||||
|
|
52
pkg/util/compare/k8s/filter.go
Normal file
52
pkg/util/compare/k8s/filter.go
Normal file
|
@ -0,0 +1,52 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package k8s
|
||||
|
||||
import (
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
type FilterFunc[T any] func(in T) T
|
||||
|
||||
func FilterP[T any](in *T, filters ...FilterFunc[*T]) *T {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return Filter(in, filters...)
|
||||
}
|
||||
|
||||
func Filter[T any](in T, filters ...FilterFunc[T]) T {
|
||||
for _, f := range filters {
|
||||
in = f(in)
|
||||
}
|
||||
|
||||
return in
|
||||
}
|
||||
|
||||
func ObjectMetaFilter(in meta.ObjectMeta) meta.ObjectMeta {
|
||||
return meta.ObjectMeta{
|
||||
Labels: in.Labels,
|
||||
Annotations: in.Annotations,
|
||||
OwnerReferences: in.OwnerReferences,
|
||||
Finalizers: in.Finalizers,
|
||||
}
|
||||
}
|
|
@ -1,7 +1,7 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||
// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -21,21 +21,56 @@
|
|||
package k8s
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
core "k8s.io/api/core/v1"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/logging"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/helpers"
|
||||
)
|
||||
|
||||
func ChecksumService(s *core.Service) (string, error) {
|
||||
return checksumServiceSpec(&s.Spec)
|
||||
func CoreService(in *core.Service) *core.Service {
|
||||
return FilterP(in, func(in *core.Service) *core.Service {
|
||||
return &core.Service{
|
||||
ObjectMeta: ObjectMetaFilter(in.ObjectMeta),
|
||||
Spec: Filter(in.Spec, func(in core.ServiceSpec) core.ServiceSpec {
|
||||
return core.ServiceSpec{
|
||||
Type: in.Type,
|
||||
Ports: in.Ports,
|
||||
Selector: in.Selector,
|
||||
}
|
||||
}),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func checksumServiceSpec(s *core.ServiceSpec) (string, error) {
|
||||
parts := map[string]interface{}{
|
||||
"type": s.Type,
|
||||
"ports": s.Ports,
|
||||
"selector": s.Selector,
|
||||
// add here more fields when needed
|
||||
}
|
||||
return util.SHA256FromJSON(parts)
|
||||
func CoreServiceChecksum(in *core.Service) (string, error) {
|
||||
return util.SHA256FromJSON(CoreService(in))
|
||||
}
|
||||
|
||||
func CoreServiceImmutableRecreate(logger logging.Logger) helpers.Decision[*core.Service] {
|
||||
return helpers.NewImmutableFields[*core.Service](func(a, b *core.Service, changes map[string]string) helpers.Action {
|
||||
if len(changes) > 0 {
|
||||
s := logger
|
||||
|
||||
for k, v := range changes {
|
||||
s = s.Str(fmt.Sprintf("field%s", k), v)
|
||||
}
|
||||
|
||||
s.Info("Replace of Service %s required", a.GetName())
|
||||
return helpers.ActionReplace
|
||||
}
|
||||
|
||||
return helpers.ActionOK
|
||||
}).Evaluate(
|
||||
helpers.SubCompare[*core.Service, core.ServiceSpec](".spec", func(in *core.Service) core.ServiceSpec {
|
||||
if in == nil {
|
||||
return core.ServiceSpec{}
|
||||
}
|
||||
return in.Spec
|
||||
}, helpers.FromSimpleCompareStack(".type", func(a, b core.ServiceSpec) (string, bool) {
|
||||
return "ServiceType change requires object recreation", a.Type != b.Type
|
||||
})),
|
||||
)
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||
// Copyright 2023-2024 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -21,23 +21,49 @@
|
|||
package k8s
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
apps "k8s.io/api/apps/v1"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/logging"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/helpers"
|
||||
)
|
||||
|
||||
func ChecksumStatefulSet(s *apps.StatefulSet) (string, error) {
|
||||
return checksumStatefulSetSpec(&s.Spec)
|
||||
func AppsStatefulSet(in *apps.StatefulSet) *apps.StatefulSet {
|
||||
return FilterP(in, func(in *apps.StatefulSet) *apps.StatefulSet {
|
||||
return &apps.StatefulSet{
|
||||
ObjectMeta: ObjectMetaFilter(in.ObjectMeta),
|
||||
Spec: Filter(in.Spec, func(in apps.StatefulSetSpec) apps.StatefulSetSpec {
|
||||
return apps.StatefulSetSpec{
|
||||
Template: in.Template,
|
||||
Replicas: in.Replicas,
|
||||
MinReadySeconds: in.MinReadySeconds,
|
||||
Selector: in.Selector,
|
||||
ServiceName: in.ServiceName,
|
||||
}
|
||||
}),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func checksumStatefulSetSpec(s *apps.StatefulSetSpec) (string, error) {
|
||||
parts := map[string]interface{}{
|
||||
"replicas": s.Replicas,
|
||||
"serviceName": s.ServiceName,
|
||||
"minReadySeconds": s.MinReadySeconds,
|
||||
"selector": s.Selector,
|
||||
"template": s.Template,
|
||||
// add here more fields when needed
|
||||
}
|
||||
return util.SHA256FromJSON(parts)
|
||||
func AppsStatefulSetChecksum(in *apps.StatefulSet) (string, error) {
|
||||
return util.SHA256FromJSON(AppsStatefulSet(in))
|
||||
}
|
||||
|
||||
func AppsStatefulSetRecreate(logger logging.Logger) helpers.Decision[*apps.StatefulSet] {
|
||||
return helpers.NewImmutableFields[*apps.StatefulSet](func(a, b *apps.StatefulSet, changes map[string]string) helpers.Action {
|
||||
if len(changes) > 0 {
|
||||
s := logger
|
||||
|
||||
for k, v := range changes {
|
||||
s = s.Str(fmt.Sprintf("field%s", k), v)
|
||||
}
|
||||
|
||||
s.Info("Replace of StatefulSet %s required", a.GetName())
|
||||
return helpers.ActionReplace
|
||||
}
|
||||
|
||||
return helpers.ActionOK
|
||||
}).Evaluate()
|
||||
}
|
||||
|
|
|
@ -22,12 +22,14 @@ package helpers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
sharedApi "github.com/arangodb/kube-arangodb/pkg/apis/shared/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/logging"
|
||||
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
|
||||
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
|
||||
|
@ -54,6 +56,8 @@ type Object interface {
|
|||
meta.Object
|
||||
}
|
||||
|
||||
type ClientFactory[T Object] func(namespace string) Client[T]
|
||||
|
||||
type Client[T Object] interface {
|
||||
Get(ctx context.Context, name string, options meta.GetOptions) (T, error)
|
||||
Update(ctx context.Context, object T, options meta.UpdateOptions) (T, error)
|
||||
|
@ -63,24 +67,48 @@ type Client[T Object] interface {
|
|||
|
||||
type Generate[T Object] func(ctx context.Context, ref *sharedApi.Object) (T, bool, string, error)
|
||||
|
||||
func OperatorUpdate[T Object](ctx context.Context, logger logging.Logger, client Client[T], ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) (bool, error) {
|
||||
changed, err := Update[T](ctx, logger, client, ref, generator, decisions...)
|
||||
type Config[T Object] struct {
|
||||
Events event.RecorderInstance
|
||||
Logger logging.Logger
|
||||
Factory ClientFactory[T]
|
||||
Kind string
|
||||
}
|
||||
|
||||
func NewUpdator[T Object](config Config[T]) Updator[T] {
|
||||
return updator[T]{
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
type Updator[T Object] interface {
|
||||
OperatorUpdate(ctx context.Context, namespace string, parent meta.Object, ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) (T, bool, error)
|
||||
Update(ctx context.Context, namespace string, parent meta.Object, ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) (T, bool, error)
|
||||
}
|
||||
|
||||
type updator[T Object] struct {
|
||||
config Config[T]
|
||||
}
|
||||
|
||||
func (u updator[T]) OperatorUpdate(ctx context.Context, namespace string, parent meta.Object, ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) (T, bool, error) {
|
||||
obj, changed, err := u.Update(ctx, namespace, parent, ref, generator, decisions...)
|
||||
if err != nil {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
|
||||
if changed {
|
||||
return true, operator.Reconcile("Change in resources")
|
||||
return obj, true, operator.Reconcile("Change in resources")
|
||||
}
|
||||
|
||||
return false, nil
|
||||
return obj, false, nil
|
||||
}
|
||||
|
||||
func Update[T Object](ctx context.Context, logger logging.Logger, client Client[T], ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) (bool, error) {
|
||||
func (u updator[T]) Update(ctx context.Context, namespace string, parent meta.Object, ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) (T, bool, error) {
|
||||
decision := Decision[T](EmptyDecision[T]).With(decisions...)
|
||||
|
||||
client := u.config.Factory(namespace)
|
||||
|
||||
if ref == nil {
|
||||
return false, errors.Errorf("Reference is nil")
|
||||
return util.Default[T](), false, errors.Errorf("Reference is nil")
|
||||
}
|
||||
|
||||
currentRef := *ref
|
||||
|
@ -92,43 +120,52 @@ func Update[T Object](ctx context.Context, logger logging.Logger, client Client[
|
|||
object, err := util.WithKubernetesContextTimeoutP2A2(ctx, client.Get, currentRef.GetName(), meta.GetOptions{})
|
||||
if err != nil {
|
||||
if !kerrors.Is(err, kerrors.NotFound) {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
|
||||
*ref = nil
|
||||
logger.
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
Debug("Object has been removed")
|
||||
|
||||
return true, nil
|
||||
if events := u.config.Events; events != nil {
|
||||
events.Normal(parent, fmt.Sprintf("%sDeleted", u.config.Kind), "Deleted kubernetes %s %s", u.config.Kind, currentRef.GetName())
|
||||
}
|
||||
|
||||
*ref = nil
|
||||
|
||||
return util.Default[T](), true, nil
|
||||
}
|
||||
|
||||
if object.GetDeletionTimestamp() != nil {
|
||||
// Object is currently deleting
|
||||
logger.
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
Debug("Object is currently deleting")
|
||||
return true, nil
|
||||
return object, true, nil
|
||||
}
|
||||
|
||||
if object.GetUID() != currentRef.GetUID() {
|
||||
logger.
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
Warn("Recreation Required as UID changed")
|
||||
|
||||
if events := u.config.Events; events != nil {
|
||||
events.Warning(parent, fmt.Sprintf("%sForceDelete", u.config.Kind), "Deletion of kubernetes %s %s requested as UID changed", u.config.Kind, currentRef.GetName())
|
||||
}
|
||||
|
||||
if err := util.WithKubernetesContextTimeoutP1A2(ctx, client.Delete, currentRef.GetName(), meta.DeleteOptions{}); err != nil {
|
||||
if !kerrors.Is(err, kerrors.NotFound) {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
return util.Default[T](), true, nil
|
||||
}
|
||||
|
||||
discoveredObject = object
|
||||
|
@ -137,53 +174,58 @@ func Update[T Object](ctx context.Context, logger logging.Logger, client Client[
|
|||
|
||||
object, skip, checksum, err := generator(ctx, currentRef.DeepCopy())
|
||||
if err != nil {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
|
||||
if skip {
|
||||
// Skip update as it is not required
|
||||
return false, nil
|
||||
return util.Default[T](), false, nil
|
||||
}
|
||||
|
||||
if object == util.Default[T]() {
|
||||
// Object is supposed to be removed
|
||||
if currentRef == nil {
|
||||
// Nothing to do
|
||||
return false, nil
|
||||
return util.Default[T](), false, nil
|
||||
}
|
||||
|
||||
// Remove object
|
||||
if err := util.WithKubernetesContextTimeoutP1A2(ctx, client.Delete, currentRef.GetName(), meta.DeleteOptions{}); err != nil {
|
||||
if !kerrors.Is(err, kerrors.NotFound) {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
}
|
||||
|
||||
logger.
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
Info("Object deletion has been requested")
|
||||
|
||||
return true, nil
|
||||
return util.Default[T](), true, nil
|
||||
}
|
||||
|
||||
if !discoveredObjectExists {
|
||||
// Let's create Object
|
||||
newObject, err := util.WithKubernetesContextTimeoutP2A2(ctx, client.Create, object, meta.CreateOptions{})
|
||||
if err != nil {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
|
||||
currentRef = util.NewType(sharedApi.NewObjectWithChecksum(newObject, checksum))
|
||||
*ref = currentRef
|
||||
logger.
|
||||
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
Info("Object has been created")
|
||||
|
||||
return true, nil
|
||||
if events := u.config.Events; events != nil {
|
||||
events.Normal(parent, fmt.Sprintf("%sCreated", u.config.Kind), "Created kubernetes %s %s", u.config.Kind, currentRef.GetName())
|
||||
}
|
||||
|
||||
return newObject, true, nil
|
||||
}
|
||||
|
||||
// Object exists, lets check if update is required
|
||||
|
@ -195,16 +237,16 @@ func Update[T Object](ctx context.Context, logger logging.Logger, client Client[
|
|||
Object: object,
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
|
||||
switch action {
|
||||
case ActionOK:
|
||||
// Nothing to do
|
||||
return false, nil
|
||||
return discoveredObject, false, nil
|
||||
case ActionReplace:
|
||||
// Object needs to be removed
|
||||
logger.
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
|
@ -212,13 +254,17 @@ func Update[T Object](ctx context.Context, logger logging.Logger, client Client[
|
|||
|
||||
if err := util.WithKubernetesContextTimeoutP1A2(ctx, client.Delete, currentRef.GetName(), meta.DeleteOptions{}); err != nil {
|
||||
if !kerrors.Is(err, kerrors.NotFound) {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
if events := u.config.Events; events != nil {
|
||||
events.Normal(parent, fmt.Sprintf("%sReplaced", u.config.Kind), "Replaced kubernetes %s %s", u.config.Kind, currentRef.GetName())
|
||||
}
|
||||
|
||||
return util.Default[T](), true, nil
|
||||
case ActionUpdate:
|
||||
logger.
|
||||
u.config.Logger.
|
||||
Str("name", currentRef.GetName()).
|
||||
Str("checksum", currentRef.GetChecksum()).
|
||||
Str("uid", string(currentRef.GetUID())).
|
||||
|
@ -227,18 +273,22 @@ func Update[T Object](ctx context.Context, logger logging.Logger, client Client[
|
|||
newObject, err := util.WithKubernetesContextTimeoutP2A2(ctx, client.Update, object, meta.UpdateOptions{})
|
||||
if err != nil {
|
||||
if !kerrors.Is(err, kerrors.NotFound) {
|
||||
return false, err
|
||||
return util.Default[T](), false, err
|
||||
}
|
||||
|
||||
// Reconcile if object was not found
|
||||
return true, nil
|
||||
return util.Default[T](), true, nil
|
||||
}
|
||||
|
||||
if events := u.config.Events; events != nil {
|
||||
events.Normal(parent, fmt.Sprintf("%sUpdated", u.config.Kind), "Updated kubernetes %s %s", u.config.Kind, currentRef.GetName())
|
||||
}
|
||||
|
||||
*ref = util.NewType(sharedApi.NewObjectWithChecksum(newObject, checksum))
|
||||
|
||||
return true, nil
|
||||
return newObject, true, nil
|
||||
|
||||
default:
|
||||
return false, errors.Errorf("Unknown action returned")
|
||||
return util.Default[T](), false, errors.Errorf("Unknown action returned")
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ package helpers
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
|
@ -76,6 +77,14 @@ func ReplaceChecksum[T Object](ctx context.Context, current, expected DecisionOb
|
|||
return ActionOK, nil
|
||||
}
|
||||
|
||||
func UpdateChecksum[T Object](ctx context.Context, current, expected DecisionObject[T]) (Action, error) {
|
||||
if current.Checksum != expected.Checksum {
|
||||
return ActionUpdate, nil
|
||||
}
|
||||
|
||||
return ActionOK, nil
|
||||
}
|
||||
|
||||
func UpdateOwnerReference[T Object](ctx context.Context, current, expected DecisionObject[T]) (Action, error) {
|
||||
if !reflect.DeepEqual(current.Object.GetOwnerReferences(), expected.Object.GetOwnerReferences()) {
|
||||
return ActionUpdate, nil
|
||||
|
@ -83,3 +92,64 @@ func UpdateOwnerReference[T Object](ctx context.Context, current, expected Decis
|
|||
|
||||
return ActionOK, nil
|
||||
}
|
||||
|
||||
type CompareStack[T any] func(a, b T) map[string]string
|
||||
|
||||
type SimpleCompareStack[T any] func(a, b T) (string, bool)
|
||||
|
||||
func FromSimpleCompareStack[T any](name string, c SimpleCompareStack[T]) CompareStack[T] {
|
||||
return func(a, b T) map[string]string {
|
||||
if message, changed := c(a, b); changed {
|
||||
return map[string]string{
|
||||
name: message,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func Compare[T any](a, b T, ins ...CompareStack[T]) map[string]string {
|
||||
r := make(map[string]string)
|
||||
|
||||
for _, in := range ins {
|
||||
for k, v := range in(a, b) {
|
||||
r[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
type SubCompareExtract[T, X any] func(in T) X
|
||||
|
||||
func SubCompare[T, X any](prefix string, extract SubCompareExtract[T, X], ins ...CompareStack[X]) CompareStack[T] {
|
||||
return func(a, b T) map[string]string {
|
||||
res := Compare[X](extract(a), extract(b), ins...)
|
||||
r := make(map[string]string, len(res))
|
||||
for k := range res {
|
||||
r[fmt.Sprintf("%s%s", prefix, k)] = res[k]
|
||||
}
|
||||
return res
|
||||
}
|
||||
}
|
||||
|
||||
func NewImmutableFields[T Object](in ImmutableFields[T]) ImmutableFields[T] {
|
||||
return in
|
||||
}
|
||||
|
||||
type ImmutableFields[T Object] func(a, b T, changes map[string]string) Action
|
||||
|
||||
func (i ImmutableFields[T]) Evaluate(ins ...CompareStack[T]) Decision[T] {
|
||||
return func(ctx context.Context, current, expected DecisionObject[T]) (Action, error) {
|
||||
r := make(map[string]string)
|
||||
|
||||
for _, in := range ins {
|
||||
for k, v := range in(current.Object, expected.Object) {
|
||||
r[k] = v
|
||||
}
|
||||
}
|
||||
|
||||
return i(current.Object, expected.Object, r), nil
|
||||
}
|
||||
}
|
||||
|
|
|
@ -44,15 +44,26 @@ func init() {
|
|||
logging.Global().SetRoot(zerolog.New(os.Stdout).With().Timestamp().Logger())
|
||||
}
|
||||
|
||||
func runUpdate[T Object](t *testing.T, iterations int, client Client[T], ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) {
|
||||
func runUpdate[T Object](t *testing.T, iterations int, factory ClientFactory[T], ref **sharedApi.Object, generator Generate[T], decisions ...Decision[T]) {
|
||||
logger := logging.Global().Get("test")
|
||||
|
||||
i := 0
|
||||
|
||||
var obj T
|
||||
|
||||
o := tests.GVK(t, obj)
|
||||
|
||||
update := NewUpdator[T](Config[T]{
|
||||
Events: nil,
|
||||
Logger: logger,
|
||||
Factory: factory,
|
||||
Kind: o.Kind,
|
||||
})
|
||||
|
||||
for i = 1; i < 1024; i++ {
|
||||
var changed bool
|
||||
t.Run(fmt.Sprintf("Iteration %d", i), func(t *testing.T) {
|
||||
ok, err := Update[T](context.Background(), logger, client, ref, generator, decisions...)
|
||||
_, ok, err := update.Update(context.Background(), tests.FakeNamespace, nil, ref, generator, decisions...)
|
||||
require.NoError(t, err)
|
||||
changed = ok
|
||||
})
|
||||
|
@ -81,7 +92,7 @@ func get[T Object](t *testing.T, client Client[T], in T) (T, bool) {
|
|||
func Test_Updator(t *testing.T) {
|
||||
logging.Global().RegisterLogger("test", logging.Trace)
|
||||
|
||||
client := fake.NewSimpleClientset().CoreV1().Secrets(tests.FakeNamespace)
|
||||
factory := fake.NewSimpleClientset()
|
||||
|
||||
var secret = core.Secret{
|
||||
ObjectMeta: meta.ObjectMeta{
|
||||
|
@ -98,22 +109,27 @@ func Test_Updator(t *testing.T) {
|
|||
return secret.DeepCopy(), false, checksum, nil
|
||||
}
|
||||
|
||||
client := factory.CoreV1().Secrets(tests.FakeNamespace)
|
||||
clientF := func(namespace string) Client[*core.Secret] {
|
||||
return factory.CoreV1().Secrets(namespace)
|
||||
}
|
||||
|
||||
t.Run("Ensure default is handled", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 2, client, &ref, retSecret)
|
||||
runUpdate[*core.Secret](t, 2, clientF, &ref, retSecret)
|
||||
|
||||
_, ok := get[*core.Secret](t, client, &secret)
|
||||
require.True(t, ok)
|
||||
})
|
||||
|
||||
t.Run("Ensure rerun is handled", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 1, client, &ref, retSecret)
|
||||
runUpdate[*core.Secret](t, 1, clientF, &ref, retSecret)
|
||||
|
||||
_, ok := get[*core.Secret](t, client, &secret)
|
||||
require.True(t, ok)
|
||||
})
|
||||
|
||||
t.Run("Ensure delete is not handled when skip is requested", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 1, client, &ref, func(ctx context.Context, _ *sharedApi.Object) (*core.Secret, bool, string, error) {
|
||||
runUpdate[*core.Secret](t, 1, clientF, &ref, func(ctx context.Context, _ *sharedApi.Object) (*core.Secret, bool, string, error) {
|
||||
return nil, true, "", nil
|
||||
})
|
||||
|
||||
|
@ -122,7 +138,7 @@ func Test_Updator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Ensure delete is handled", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 3, client, &ref, func(ctx context.Context, _ *sharedApi.Object) (*core.Secret, bool, string, error) {
|
||||
runUpdate[*core.Secret](t, 3, clientF, &ref, func(ctx context.Context, _ *sharedApi.Object) (*core.Secret, bool, string, error) {
|
||||
return nil, false, "", nil
|
||||
})
|
||||
|
||||
|
@ -131,7 +147,7 @@ func Test_Updator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Recreate", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 2, client, &ref, retSecret)
|
||||
runUpdate[*core.Secret](t, 2, clientF, &ref, retSecret)
|
||||
|
||||
_, ok := get[*core.Secret](t, client, &secret)
|
||||
require.True(t, ok)
|
||||
|
@ -140,7 +156,7 @@ func Test_Updator(t *testing.T) {
|
|||
t.Run("Change checksum without handler", func(t *testing.T) {
|
||||
checksum = util.SHA256FromString("NEW")
|
||||
|
||||
runUpdate[*core.Secret](t, 1, client, &ref, retSecret)
|
||||
runUpdate[*core.Secret](t, 1, clientF, &ref, retSecret)
|
||||
|
||||
require.NotEqual(t, checksum, ref.GetChecksum())
|
||||
|
||||
|
@ -149,7 +165,7 @@ func Test_Updator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Change checksum with recreate handler", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 4, client, &ref, retSecret, ReplaceChecksum[*core.Secret])
|
||||
runUpdate[*core.Secret](t, 4, clientF, &ref, retSecret, ReplaceChecksum[*core.Secret])
|
||||
|
||||
require.Equal(t, checksum, ref.GetChecksum())
|
||||
|
||||
|
@ -160,7 +176,7 @@ func Test_Updator(t *testing.T) {
|
|||
t.Run("UUID Changed", func(t *testing.T) {
|
||||
ref.UID = util.NewType(uuid.NewUUID())
|
||||
|
||||
runUpdate[*core.Secret](t, 4, client, &ref, retSecret)
|
||||
runUpdate[*core.Secret](t, 4, clientF, &ref, retSecret)
|
||||
|
||||
s, ok := get[*core.Secret](t, client, &secret)
|
||||
require.True(t, ok)
|
||||
|
@ -174,7 +190,7 @@ func Test_Updator(t *testing.T) {
|
|||
},
|
||||
})
|
||||
|
||||
runUpdate[*core.Secret](t, 1, client, &ref, retSecret)
|
||||
runUpdate[*core.Secret](t, 1, clientF, &ref, retSecret)
|
||||
|
||||
s, ok := get[*core.Secret](t, client, &secret)
|
||||
require.True(t, ok)
|
||||
|
@ -182,7 +198,7 @@ func Test_Updator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Owner Added With Handler", func(t *testing.T) {
|
||||
runUpdate[*core.Secret](t, 2, client, &ref, retSecret, UpdateOwnerReference[*core.Secret])
|
||||
runUpdate[*core.Secret](t, 2, clientF, &ref, retSecret, UpdateOwnerReference[*core.Secret])
|
||||
|
||||
s, ok := get[*core.Secret](t, client, &secret)
|
||||
require.True(t, ok)
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
core "k8s.io/api/core/v1"
|
||||
rbac "k8s.io/api/rbac/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
|
@ -1053,6 +1054,140 @@ func IsNamespaced(in meta.Object) bool {
|
|||
}
|
||||
}
|
||||
|
||||
func GVK(t *testing.T, object meta.Object) schema.GroupVersionKind {
|
||||
switch v := object.(type) {
|
||||
case *batch.CronJob:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "batch",
|
||||
Version: "v1",
|
||||
Kind: "CronJob",
|
||||
}
|
||||
case *batch.Job:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "batch",
|
||||
Version: "v1",
|
||||
Kind: "Job",
|
||||
}
|
||||
case *core.Pod:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "",
|
||||
Version: "v1",
|
||||
Kind: "Pod",
|
||||
}
|
||||
case *core.Secret:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "",
|
||||
Version: "v1",
|
||||
Kind: "Secret",
|
||||
}
|
||||
case *core.Service:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "",
|
||||
Version: "v1",
|
||||
Kind: "Service",
|
||||
}
|
||||
case *core.ServiceAccount:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "",
|
||||
Version: "v1",
|
||||
Kind: "ServiceAccount",
|
||||
}
|
||||
case *apps.StatefulSet:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "apps",
|
||||
Version: "v1",
|
||||
Kind: "StatefulSet",
|
||||
}
|
||||
case *api.ArangoDeployment:
|
||||
return schema.GroupVersionKind{
|
||||
Group: deployment.ArangoDeploymentGroupName,
|
||||
Version: api.ArangoDeploymentVersion,
|
||||
Kind: deployment.ArangoDeploymentResourceKind,
|
||||
}
|
||||
case *api.ArangoClusterSynchronization:
|
||||
return schema.GroupVersionKind{
|
||||
Group: deployment.ArangoDeploymentGroupName,
|
||||
Version: api.ArangoDeploymentVersion,
|
||||
Kind: deployment.ArangoClusterSynchronizationResourceKind,
|
||||
}
|
||||
case *backupApi.ArangoBackup:
|
||||
return schema.GroupVersionKind{
|
||||
Group: backup.ArangoBackupGroupName,
|
||||
Version: backupApi.ArangoBackupVersion,
|
||||
Kind: backup.ArangoBackupResourceKind,
|
||||
}
|
||||
case *mlApi.ArangoMLExtension:
|
||||
return schema.GroupVersionKind{
|
||||
Group: ml.ArangoMLGroupName,
|
||||
Version: mlApi.ArangoMLVersion,
|
||||
Kind: ml.ArangoMLExtensionResourceKind,
|
||||
}
|
||||
case *mlApi.ArangoMLStorage:
|
||||
return schema.GroupVersionKind{
|
||||
Group: ml.ArangoMLGroupName,
|
||||
Version: mlApi.ArangoMLVersion,
|
||||
Kind: ml.ArangoMLStorageResourceKind,
|
||||
}
|
||||
case *mlApiv1alpha1.ArangoMLExtension:
|
||||
return schema.GroupVersionKind{
|
||||
Group: ml.ArangoMLGroupName,
|
||||
Version: mlApiv1alpha1.ArangoMLVersion,
|
||||
Kind: ml.ArangoMLExtensionResourceKind,
|
||||
}
|
||||
case *mlApiv1alpha1.ArangoMLStorage:
|
||||
return schema.GroupVersionKind{
|
||||
Group: ml.ArangoMLGroupName,
|
||||
Version: mlApiv1alpha1.ArangoMLVersion,
|
||||
Kind: ml.ArangoMLStorageResourceKind,
|
||||
}
|
||||
case *mlApiv1alpha1.ArangoMLBatchJob:
|
||||
return schema.GroupVersionKind{
|
||||
Group: ml.ArangoMLGroupName,
|
||||
Version: mlApiv1alpha1.ArangoMLVersion,
|
||||
Kind: ml.ArangoMLBatchJobResourceKind,
|
||||
}
|
||||
case *mlApiv1alpha1.ArangoMLCronJob:
|
||||
return schema.GroupVersionKind{
|
||||
Group: ml.ArangoMLGroupName,
|
||||
Version: mlApiv1alpha1.ArangoMLVersion,
|
||||
Kind: ml.ArangoMLCronJobResourceKind,
|
||||
}
|
||||
case *rbac.ClusterRole:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "rbac.authorization.k8s.io",
|
||||
Version: "v1",
|
||||
Kind: "ClusterRole",
|
||||
}
|
||||
case *rbac.ClusterRoleBinding:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "rbac.authorization.k8s.io",
|
||||
Version: "v1",
|
||||
Kind: "ClusterRoleBinding",
|
||||
}
|
||||
case *rbac.Role:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "rbac.authorization.k8s.io",
|
||||
Version: "v1",
|
||||
Kind: "Role",
|
||||
}
|
||||
case *rbac.RoleBinding:
|
||||
return schema.GroupVersionKind{
|
||||
Group: "rbac.authorization.k8s.io",
|
||||
Version: "v1",
|
||||
Kind: "RoleBinding",
|
||||
}
|
||||
case *schedulerApi.ArangoProfile:
|
||||
return schema.GroupVersionKind{
|
||||
Group: scheduler.ArangoSchedulerGroupName,
|
||||
Version: schedulerApi.ArangoSchedulerVersion,
|
||||
Kind: scheduler.ArangoProfileResourceKind,
|
||||
}
|
||||
default:
|
||||
require.Fail(t, fmt.Sprintf("Unable to create object: %s", reflect.TypeOf(v).String()))
|
||||
return schema.GroupVersionKind{}
|
||||
}
|
||||
}
|
||||
|
||||
func NewItem(t *testing.T, o operation.Operation, object meta.Object) operation.Item {
|
||||
item := operation.Item{
|
||||
Operation: o,
|
||||
|
@ -1061,94 +1196,11 @@ func NewItem(t *testing.T, o operation.Operation, object meta.Object) operation.
|
|||
Name: object.GetName(),
|
||||
}
|
||||
|
||||
switch v := object.(type) {
|
||||
case *batch.CronJob:
|
||||
item.Group = "batch"
|
||||
item.Version = "v1"
|
||||
item.Kind = "CronJob"
|
||||
case *batch.Job:
|
||||
item.Group = "batch"
|
||||
item.Version = "v1"
|
||||
item.Kind = "Job"
|
||||
case *core.Pod:
|
||||
item.Group = ""
|
||||
item.Version = "v1"
|
||||
item.Kind = "Pod"
|
||||
case *core.Secret:
|
||||
item.Group = ""
|
||||
item.Version = "v1"
|
||||
item.Kind = "Secret"
|
||||
case *core.Service:
|
||||
item.Group = ""
|
||||
item.Version = "v1"
|
||||
item.Kind = "Service"
|
||||
case *core.ServiceAccount:
|
||||
item.Group = ""
|
||||
item.Version = "v1"
|
||||
item.Kind = "ServiceAccount"
|
||||
case *apps.StatefulSet:
|
||||
item.Group = "apps"
|
||||
item.Version = "v1"
|
||||
item.Kind = "StatefulSet"
|
||||
case *api.ArangoDeployment:
|
||||
item.Group = deployment.ArangoDeploymentGroupName
|
||||
item.Version = api.ArangoDeploymentVersion
|
||||
item.Kind = deployment.ArangoDeploymentResourceKind
|
||||
case *api.ArangoClusterSynchronization:
|
||||
item.Group = deployment.ArangoDeploymentGroupName
|
||||
item.Version = api.ArangoDeploymentVersion
|
||||
item.Kind = deployment.ArangoClusterSynchronizationResourceKind
|
||||
case *backupApi.ArangoBackup:
|
||||
item.Group = backup.ArangoBackupGroupName
|
||||
item.Version = backupApi.ArangoBackupVersion
|
||||
item.Kind = backup.ArangoBackupResourceKind
|
||||
case *mlApi.ArangoMLExtension:
|
||||
item.Group = ml.ArangoMLGroupName
|
||||
item.Version = mlApi.ArangoMLVersion
|
||||
item.Kind = ml.ArangoMLExtensionResourceKind
|
||||
case *mlApi.ArangoMLStorage:
|
||||
item.Group = ml.ArangoMLGroupName
|
||||
item.Version = mlApi.ArangoMLVersion
|
||||
item.Kind = ml.ArangoMLStorageResourceKind
|
||||
case *mlApiv1alpha1.ArangoMLExtension:
|
||||
item.Group = ml.ArangoMLGroupName
|
||||
item.Version = mlApiv1alpha1.ArangoMLVersion
|
||||
item.Kind = ml.ArangoMLExtensionResourceKind
|
||||
case *mlApiv1alpha1.ArangoMLStorage:
|
||||
item.Group = ml.ArangoMLGroupName
|
||||
item.Version = mlApiv1alpha1.ArangoMLVersion
|
||||
item.Kind = ml.ArangoMLStorageResourceKind
|
||||
case *mlApiv1alpha1.ArangoMLBatchJob:
|
||||
item.Group = ml.ArangoMLGroupName
|
||||
item.Version = mlApiv1alpha1.ArangoMLVersion
|
||||
item.Kind = ml.ArangoMLBatchJobResourceKind
|
||||
case *mlApiv1alpha1.ArangoMLCronJob:
|
||||
item.Group = ml.ArangoMLGroupName
|
||||
item.Version = mlApiv1alpha1.ArangoMLVersion
|
||||
item.Kind = ml.ArangoMLCronJobResourceKind
|
||||
case *rbac.ClusterRole:
|
||||
item.Group = "rbac.authorization.k8s.io"
|
||||
item.Version = "v1"
|
||||
item.Kind = "ClusterRole"
|
||||
case *rbac.ClusterRoleBinding:
|
||||
item.Group = "rbac.authorization.k8s.io"
|
||||
item.Version = "v1"
|
||||
item.Kind = "ClusterRoleBinding"
|
||||
case *rbac.Role:
|
||||
item.Group = "rbac.authorization.k8s.io"
|
||||
item.Version = "v1"
|
||||
item.Kind = "Role"
|
||||
case *rbac.RoleBinding:
|
||||
item.Group = "rbac.authorization.k8s.io"
|
||||
item.Version = "v1"
|
||||
item.Kind = "RoleBinding"
|
||||
case *schedulerApi.ArangoProfile:
|
||||
item.Group = scheduler.ArangoSchedulerGroupName
|
||||
item.Version = schedulerApi.ArangoSchedulerVersion
|
||||
item.Kind = scheduler.ArangoProfileResourceKind
|
||||
default:
|
||||
require.Fail(t, fmt.Sprintf("Unable to create object: %s", reflect.TypeOf(v).String()))
|
||||
}
|
||||
gvk := GVK(t, object)
|
||||
|
||||
item.Group = gvk.Group
|
||||
item.Version = gvk.Version
|
||||
item.Kind = gvk.Kind
|
||||
|
||||
return item
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue