diff --git a/CHANGELOG.md b/CHANGELOG.md index 1266b9ec8..08b47bd51 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ - (Feature) Add ArangoMember overrides - (Feature) ArangoMember Removal Priority - (Feature) Add --deployment.feature.init-containers-copy-resources (default enabled) +- (Feature) Add maxBackups option to ArangoBackupPolicy ## [1.2.32](https://github.com/arangodb/kube-arangodb/tree/1.2.32) (2023-08-07) - (Feature) Backup lifetime - remove Backup once its lifetime has been reached diff --git a/pkg/apis/backup/v1/backup_policy_spec.go b/pkg/apis/backup/v1/backup_policy_spec.go index 762ce42e2..a4ba123cf 100644 --- a/pkg/apis/backup/v1/backup_policy_spec.go +++ b/pkg/apis/backup/v1/backup_policy_spec.go @@ -33,6 +33,9 @@ type ArangoBackupPolicySpec struct { AllowConcurrent *bool `json:"allowConcurrent,omitempty"` // DeploymentSelector specifies which deployments should get a backup DeploymentSelector *meta.LabelSelector `json:"selector,omitempty"` + // MaxBackups defines how many backups should be kept in history (per deployment). Oldest Backups will be deleted. + // If not specified or 0 then no limit is applied + MaxBackups int `json:"maxBackups,omitempty"` // ArangoBackupTemplate specifies additional options for newly created ArangoBackup BackupTemplate ArangoBackupTemplate `json:"template"` } diff --git a/pkg/handlers/policy/handler.go b/pkg/handlers/policy/handler.go index 1fa8565d2..1c90306cc 100644 --- a/pkg/handlers/policy/handler.go +++ b/pkg/handlers/policy/handler.go @@ -37,16 +37,19 @@ import ( operator "github.com/arangodb/kube-arangodb/pkg/operatorV2" "github.com/arangodb/kube-arangodb/pkg/operatorV2/event" "github.com/arangodb/kube-arangodb/pkg/operatorV2/operation" + "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/globals" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" ) const ( - backupCreated = "ArangoBackupCreated" - policyError = "Error" - rescheduled = "Rescheduled" - scheduleSkipped = "ScheduleSkipped" + backupCreated = "ArangoBackupCreated" + policyError = "Error" + rescheduled = "Rescheduled" + scheduleSkipped = "ScheduleSkipped" + cleanedUpOldBackups = "CleanedUpOldBackups" ) type handler struct { @@ -138,7 +141,6 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back // Schedule new deployments listOptions := meta.ListOptions{} - if policy.Spec.DeploymentSelector != nil && (policy.Spec.DeploymentSelector.MatchLabels != nil && len(policy.Spec.DeploymentSelector.MatchLabels) > 0 || @@ -147,7 +149,6 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back } deployments, err := h.client.DatabaseV1().ArangoDeployments(policy.Namespace).List(context.Background(), listOptions) - if err != nil { h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) @@ -157,11 +158,13 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back } } + needToListBackups := !policy.Spec.GetAllowConcurrent() || policy.Spec.MaxBackups > 0 for _, deployment := range deployments.Items { depl := deployment.DeepCopy() + ctx := context.Background() - if !policy.Spec.GetAllowConcurrent() { - previousBackupInProgress, err := h.isPreviousBackupInProgress(context.Background(), depl, policy.Name) + if needToListBackups { + backups, err := h.listAllBackupsForPolicy(ctx, depl, policy.Name) if err != nil { h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) return backupApi.ArangoBackupPolicyStatus{ @@ -169,7 +172,17 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back Message: fmt.Sprintf("backup creation failed: %s", err.Error()), } } - if previousBackupInProgress { + if numRemoved, err := h.removeOldHealthyBackups(ctx, policy.Spec.MaxBackups, backups); err != nil { + h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) + return backupApi.ArangoBackupPolicyStatus{ + Scheduled: policy.Status.Scheduled, + Message: fmt.Sprintf("automatic backup cleanup failed: %s", err.Error()), + } + } else if numRemoved > 0 { + eventMsg := fmt.Sprintf("Cleaned up %d old backups due to maxBackups setting %s/%s", numRemoved, deployment.Namespace, deployment.Name) + h.eventRecorder.Normal(policy, cleanedUpOldBackups, eventMsg) + } + if !policy.Spec.GetAllowConcurrent() && h.isPreviousBackupInProgress(backups) { eventMsg := fmt.Sprintf("Skipping ArangoBackup creation because earlier backup still running %s/%s", deployment.Namespace, deployment.Name) h.eventRecorder.Normal(policy, scheduleSkipped, eventMsg) continue @@ -177,7 +190,7 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back } b := policy.NewBackup(depl) - if _, err := h.client.BackupV1().ArangoBackups(b.Namespace).Create(context.Background(), b, meta.CreateOptions{}); err != nil { + if _, err := h.client.BackupV1().ArangoBackups(b.Namespace).Create(ctx, b, meta.CreateOptions{}); err != nil { h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) return backupApi.ArangoBackupPolicyStatus{ @@ -206,7 +219,7 @@ func (*handler) CanBeHandled(item operation.Item) bool { item.Kind == backup.ArangoBackupPolicyResourceKind } -func (h *handler) listAllBackupsForPolicy(ctx context.Context, d *deployment.ArangoDeployment, policyName string) ([]*backupApi.ArangoBackup, error) { +func (h *handler) listAllBackupsForPolicy(ctx context.Context, d *deployment.ArangoDeployment, policyName string) (util.List[*backupApi.ArangoBackup], error) { var r []*backupApi.ArangoBackup if err := k8sutil.APIList[*backupApi.ArangoBackupList](ctx, h.client.BackupV1().ArangoBackups(d.Namespace), meta.ListOptions{ @@ -228,37 +241,57 @@ func (h *handler) listAllBackupsForPolicy(ctx context.Context, d *deployment.Ara return nil }); err != nil { - return nil, err + return nil, errors.Wrap(err, "Failed to list ArangoBackups") } return r, nil } -func (h *handler) isPreviousBackupInProgress(ctx context.Context, d *deployment.ArangoDeployment, policyName string) (bool, error) { - // It would be nice to List CRs with fieldSelector set, but this is not supported: - // https://github.com/kubernetes/kubernetes/issues/53459 - // Instead we fetch all ArangoBackups: - backups, err := h.listAllBackupsForPolicy(ctx, d, policyName) - if err != nil { - return false, errors.Wrap(err, "Failed to list ArangoBackups") - } - - for _, b := range backups { - // Check if we are in the failed state +func (h *handler) isPreviousBackupInProgress(backups util.List[*backupApi.ArangoBackup]) bool { + inProgressBackups := backups.Count(func(b *backupApi.ArangoBackup) bool { switch b.Status.State { case backupApi.ArangoBackupStateFailed: - continue + return false } if b.Spec.Download != nil { - continue + return false } // Backup is not yet done if b.Status.Backup == nil { - return true, nil + return true } + return false + }) + return inProgressBackups > 0 +} + +func (h *handler) removeOldHealthyBackups(ctx context.Context, limit int, backups util.List[*backupApi.ArangoBackup]) (int, error) { + if limit <= 0 { + // no limit set + return 0, nil } - return false, nil + healthyBackups := backups.Filter(func(b *backupApi.ArangoBackup) bool { + return b.Status.State == backupApi.ArangoBackupStateReady + }).Sort(func(a *backupApi.ArangoBackup, b *backupApi.ArangoBackup) bool { + // newest first + return a.CreationTimestamp.After(b.CreationTimestamp.Time) + }) + if len(healthyBackups) < limit { + return 0, nil + } + toDelete := healthyBackups[limit-1:] + numDeleted := 0 + for _, b := range toDelete { + err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { + return h.client.BackupV1().ArangoBackups(b.Namespace).Delete(ctx, b.Name, meta.DeleteOptions{}) + }) + if err != nil && !kerrors.IsNotFound(err) { + return numDeleted, errors.Wrapf(err, "could not trigger deletion of backup %s", b.Name) + } + numDeleted++ + } + return numDeleted, nil } diff --git a/pkg/operatorV2/operator_worker.go b/pkg/operatorV2/operator_worker.go index 15086c109..b9b507b35 100644 --- a/pkg/operatorV2/operator_worker.go +++ b/pkg/operatorV2/operator_worker.go @@ -104,7 +104,7 @@ func (o *operator) processObject(obj interface{}) error { if err = o.processItem(item); err != nil { o.workqueue.AddRateLimited(key) - return errors.Newf("error syncing '%s': %s, requeuing", key, err.Error()) + return errors.Newf("error syncing '%s': %s, re-queuing", key, err.Error()) } loggerWorker.Trace("Processed Item Action: %s, Type: %s/%s/%s, Namespace: %s, Name: %s", diff --git a/pkg/util/list.go b/pkg/util/list.go new file mode 100644 index 000000000..6fa207dd2 --- /dev/null +++ b/pkg/util/list.go @@ -0,0 +1,61 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package util + +import "sort" + +type List[T any] []T + +func (l List[T]) Filter(fn func(T) bool) List[T] { + if l == nil { + return nil + } + result := make([]T, 0) + for _, item := range l { + if fn(item) { + result = append(result, item) + } + } + return result +} + +func (l List[T]) Count(fn func(T) bool) int { + return len(l.Filter(fn)) +} + +func (l List[T]) Sort(fn func(T, T) bool) List[T] { + clone := l + sort.Slice(clone, func(i, j int) bool { + return fn(clone[i], clone[j]) + }) + return clone +} + +func MapList[T, V any](in List[T], fn func(T) V) List[V] { + if in == nil { + return nil + } + result := make(List[V], 0, len(in)) + for _, em := range in { + result = append(result, fn(em)) + } + return result +} diff --git a/pkg/util/list_test.go b/pkg/util/list_test.go new file mode 100644 index 000000000..0f728da1f --- /dev/null +++ b/pkg/util/list_test.go @@ -0,0 +1,69 @@ +// +// DISCLAIMER +// +// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package util + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func Test_List_Sort(t *testing.T) { + type obj struct { + creationDate time.Time + } + now := time.Now() + + l := List[*obj]{ + &obj{now}, + &obj{now.Add(time.Second)}, + &obj{now.Add(-time.Second)}, + &obj{now.Add(time.Hour)}, + &obj{now.Add(-time.Hour)}, + } + expected := List[*obj]{ + &obj{now.Add(time.Hour)}, + &obj{now.Add(time.Second)}, + &obj{now}, + &obj{now.Add(-time.Second)}, + &obj{now.Add(-time.Hour)}, + } + sorted := l.Sort(func(a *obj, b *obj) bool { + return a.creationDate.After(b.creationDate) + }) + require.EqualValues(t, expected, sorted) +} + +func Test_MapList(t *testing.T) { + type obj struct { + name string + } + l := List[*obj]{ + &obj{"a"}, + &obj{"b"}, + &obj{"c"}, + } + expected := List[string]{"a", "b", "c"} + require.Equal(t, expected, MapList(l, func(o *obj) string { + return o.name + })) +}