1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] GT-351 | Add maxBackups option to ArangoBackupPolicy (#1321)

This commit is contained in:
Nikita Vaniasin 2023-09-19 16:28:48 +02:00 committed by GitHub
parent 5c5fda4727
commit 35c6e58783
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 195 additions and 28 deletions

View file

@ -21,6 +21,7 @@
- (Feature) Add ArangoMember overrides - (Feature) Add ArangoMember overrides
- (Feature) ArangoMember Removal Priority - (Feature) ArangoMember Removal Priority
- (Feature) Add --deployment.feature.init-containers-copy-resources (default enabled) - (Feature) Add --deployment.feature.init-containers-copy-resources (default enabled)
- (Feature) Add maxBackups option to ArangoBackupPolicy
## [1.2.32](https://github.com/arangodb/kube-arangodb/tree/1.2.32) (2023-08-07) ## [1.2.32](https://github.com/arangodb/kube-arangodb/tree/1.2.32) (2023-08-07)
- (Feature) Backup lifetime - remove Backup once its lifetime has been reached - (Feature) Backup lifetime - remove Backup once its lifetime has been reached

View file

@ -33,6 +33,9 @@ type ArangoBackupPolicySpec struct {
AllowConcurrent *bool `json:"allowConcurrent,omitempty"` AllowConcurrent *bool `json:"allowConcurrent,omitempty"`
// DeploymentSelector specifies which deployments should get a backup // DeploymentSelector specifies which deployments should get a backup
DeploymentSelector *meta.LabelSelector `json:"selector,omitempty"` DeploymentSelector *meta.LabelSelector `json:"selector,omitempty"`
// MaxBackups defines how many backups should be kept in history (per deployment). Oldest Backups will be deleted.
// If not specified or 0 then no limit is applied
MaxBackups int `json:"maxBackups,omitempty"`
// ArangoBackupTemplate specifies additional options for newly created ArangoBackup // ArangoBackupTemplate specifies additional options for newly created ArangoBackup
BackupTemplate ArangoBackupTemplate `json:"template"` BackupTemplate ArangoBackupTemplate `json:"template"`
} }

View file

@ -37,16 +37,19 @@ import (
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2" operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event" "github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/operation" "github.com/arangodb/kube-arangodb/pkg/operatorV2/operation"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals" "github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
) )
const ( const (
backupCreated = "ArangoBackupCreated" backupCreated = "ArangoBackupCreated"
policyError = "Error" policyError = "Error"
rescheduled = "Rescheduled" rescheduled = "Rescheduled"
scheduleSkipped = "ScheduleSkipped" scheduleSkipped = "ScheduleSkipped"
cleanedUpOldBackups = "CleanedUpOldBackups"
) )
type handler struct { type handler struct {
@ -138,7 +141,6 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back
// Schedule new deployments // Schedule new deployments
listOptions := meta.ListOptions{} listOptions := meta.ListOptions{}
if policy.Spec.DeploymentSelector != nil && if policy.Spec.DeploymentSelector != nil &&
(policy.Spec.DeploymentSelector.MatchLabels != nil && (policy.Spec.DeploymentSelector.MatchLabels != nil &&
len(policy.Spec.DeploymentSelector.MatchLabels) > 0 || len(policy.Spec.DeploymentSelector.MatchLabels) > 0 ||
@ -147,7 +149,6 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back
} }
deployments, err := h.client.DatabaseV1().ArangoDeployments(policy.Namespace).List(context.Background(), listOptions) deployments, err := h.client.DatabaseV1().ArangoDeployments(policy.Namespace).List(context.Background(), listOptions)
if err != nil { if err != nil {
h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error())
@ -157,11 +158,13 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back
} }
} }
needToListBackups := !policy.Spec.GetAllowConcurrent() || policy.Spec.MaxBackups > 0
for _, deployment := range deployments.Items { for _, deployment := range deployments.Items {
depl := deployment.DeepCopy() depl := deployment.DeepCopy()
ctx := context.Background()
if !policy.Spec.GetAllowConcurrent() { if needToListBackups {
previousBackupInProgress, err := h.isPreviousBackupInProgress(context.Background(), depl, policy.Name) backups, err := h.listAllBackupsForPolicy(ctx, depl, policy.Name)
if err != nil { if err != nil {
h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error())
return backupApi.ArangoBackupPolicyStatus{ return backupApi.ArangoBackupPolicyStatus{
@ -169,7 +172,17 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back
Message: fmt.Sprintf("backup creation failed: %s", err.Error()), Message: fmt.Sprintf("backup creation failed: %s", err.Error()),
} }
} }
if previousBackupInProgress { if numRemoved, err := h.removeOldHealthyBackups(ctx, policy.Spec.MaxBackups, backups); err != nil {
h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error())
return backupApi.ArangoBackupPolicyStatus{
Scheduled: policy.Status.Scheduled,
Message: fmt.Sprintf("automatic backup cleanup failed: %s", err.Error()),
}
} else if numRemoved > 0 {
eventMsg := fmt.Sprintf("Cleaned up %d old backups due to maxBackups setting %s/%s", numRemoved, deployment.Namespace, deployment.Name)
h.eventRecorder.Normal(policy, cleanedUpOldBackups, eventMsg)
}
if !policy.Spec.GetAllowConcurrent() && h.isPreviousBackupInProgress(backups) {
eventMsg := fmt.Sprintf("Skipping ArangoBackup creation because earlier backup still running %s/%s", deployment.Namespace, deployment.Name) eventMsg := fmt.Sprintf("Skipping ArangoBackup creation because earlier backup still running %s/%s", deployment.Namespace, deployment.Name)
h.eventRecorder.Normal(policy, scheduleSkipped, eventMsg) h.eventRecorder.Normal(policy, scheduleSkipped, eventMsg)
continue continue
@ -177,7 +190,7 @@ func (h *handler) processBackupPolicy(policy *backupApi.ArangoBackupPolicy) back
} }
b := policy.NewBackup(depl) b := policy.NewBackup(depl)
if _, err := h.client.BackupV1().ArangoBackups(b.Namespace).Create(context.Background(), b, meta.CreateOptions{}); err != nil { if _, err := h.client.BackupV1().ArangoBackups(b.Namespace).Create(ctx, b, meta.CreateOptions{}); err != nil {
h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error()) h.eventRecorder.Warning(policy, policyError, "Policy Error: %s", err.Error())
return backupApi.ArangoBackupPolicyStatus{ return backupApi.ArangoBackupPolicyStatus{
@ -206,7 +219,7 @@ func (*handler) CanBeHandled(item operation.Item) bool {
item.Kind == backup.ArangoBackupPolicyResourceKind item.Kind == backup.ArangoBackupPolicyResourceKind
} }
func (h *handler) listAllBackupsForPolicy(ctx context.Context, d *deployment.ArangoDeployment, policyName string) ([]*backupApi.ArangoBackup, error) { func (h *handler) listAllBackupsForPolicy(ctx context.Context, d *deployment.ArangoDeployment, policyName string) (util.List[*backupApi.ArangoBackup], error) {
var r []*backupApi.ArangoBackup var r []*backupApi.ArangoBackup
if err := k8sutil.APIList[*backupApi.ArangoBackupList](ctx, h.client.BackupV1().ArangoBackups(d.Namespace), meta.ListOptions{ if err := k8sutil.APIList[*backupApi.ArangoBackupList](ctx, h.client.BackupV1().ArangoBackups(d.Namespace), meta.ListOptions{
@ -228,37 +241,57 @@ func (h *handler) listAllBackupsForPolicy(ctx context.Context, d *deployment.Ara
return nil return nil
}); err != nil { }); err != nil {
return nil, err return nil, errors.Wrap(err, "Failed to list ArangoBackups")
} }
return r, nil return r, nil
} }
func (h *handler) isPreviousBackupInProgress(ctx context.Context, d *deployment.ArangoDeployment, policyName string) (bool, error) { func (h *handler) isPreviousBackupInProgress(backups util.List[*backupApi.ArangoBackup]) bool {
// It would be nice to List CRs with fieldSelector set, but this is not supported: inProgressBackups := backups.Count(func(b *backupApi.ArangoBackup) bool {
// https://github.com/kubernetes/kubernetes/issues/53459
// Instead we fetch all ArangoBackups:
backups, err := h.listAllBackupsForPolicy(ctx, d, policyName)
if err != nil {
return false, errors.Wrap(err, "Failed to list ArangoBackups")
}
for _, b := range backups {
// Check if we are in the failed state
switch b.Status.State { switch b.Status.State {
case backupApi.ArangoBackupStateFailed: case backupApi.ArangoBackupStateFailed:
continue return false
} }
if b.Spec.Download != nil { if b.Spec.Download != nil {
continue return false
} }
// Backup is not yet done // Backup is not yet done
if b.Status.Backup == nil { if b.Status.Backup == nil {
return true, nil return true
} }
return false
})
return inProgressBackups > 0
}
func (h *handler) removeOldHealthyBackups(ctx context.Context, limit int, backups util.List[*backupApi.ArangoBackup]) (int, error) {
if limit <= 0 {
// no limit set
return 0, nil
} }
return false, nil healthyBackups := backups.Filter(func(b *backupApi.ArangoBackup) bool {
return b.Status.State == backupApi.ArangoBackupStateReady
}).Sort(func(a *backupApi.ArangoBackup, b *backupApi.ArangoBackup) bool {
// newest first
return a.CreationTimestamp.After(b.CreationTimestamp.Time)
})
if len(healthyBackups) < limit {
return 0, nil
}
toDelete := healthyBackups[limit-1:]
numDeleted := 0
for _, b := range toDelete {
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
return h.client.BackupV1().ArangoBackups(b.Namespace).Delete(ctx, b.Name, meta.DeleteOptions{})
})
if err != nil && !kerrors.IsNotFound(err) {
return numDeleted, errors.Wrapf(err, "could not trigger deletion of backup %s", b.Name)
}
numDeleted++
}
return numDeleted, nil
} }

View file

@ -104,7 +104,7 @@ func (o *operator) processObject(obj interface{}) error {
if err = o.processItem(item); err != nil { if err = o.processItem(item); err != nil {
o.workqueue.AddRateLimited(key) o.workqueue.AddRateLimited(key)
return errors.Newf("error syncing '%s': %s, requeuing", key, err.Error()) return errors.Newf("error syncing '%s': %s, re-queuing", key, err.Error())
} }
loggerWorker.Trace("Processed Item Action: %s, Type: %s/%s/%s, Namespace: %s, Name: %s", loggerWorker.Trace("Processed Item Action: %s, Type: %s/%s/%s, Namespace: %s, Name: %s",

61
pkg/util/list.go Normal file
View file

@ -0,0 +1,61 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package util
import "sort"
type List[T any] []T
func (l List[T]) Filter(fn func(T) bool) List[T] {
if l == nil {
return nil
}
result := make([]T, 0)
for _, item := range l {
if fn(item) {
result = append(result, item)
}
}
return result
}
func (l List[T]) Count(fn func(T) bool) int {
return len(l.Filter(fn))
}
func (l List[T]) Sort(fn func(T, T) bool) List[T] {
clone := l
sort.Slice(clone, func(i, j int) bool {
return fn(clone[i], clone[j])
})
return clone
}
func MapList[T, V any](in List[T], fn func(T) V) List[V] {
if in == nil {
return nil
}
result := make(List[V], 0, len(in))
for _, em := range in {
result = append(result, fn(em))
}
return result
}

69
pkg/util/list_test.go Normal file
View file

@ -0,0 +1,69 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package util
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func Test_List_Sort(t *testing.T) {
type obj struct {
creationDate time.Time
}
now := time.Now()
l := List[*obj]{
&obj{now},
&obj{now.Add(time.Second)},
&obj{now.Add(-time.Second)},
&obj{now.Add(time.Hour)},
&obj{now.Add(-time.Hour)},
}
expected := List[*obj]{
&obj{now.Add(time.Hour)},
&obj{now.Add(time.Second)},
&obj{now},
&obj{now.Add(-time.Second)},
&obj{now.Add(-time.Hour)},
}
sorted := l.Sort(func(a *obj, b *obj) bool {
return a.creationDate.After(b.creationDate)
})
require.EqualValues(t, expected, sorted)
}
func Test_MapList(t *testing.T) {
type obj struct {
name string
}
l := List[*obj]{
&obj{"a"},
&obj{"b"},
&obj{"c"},
}
expected := List[string]{"a", "b", "c"}
require.Equal(t, expected, MapList(l, func(o *obj) string {
return o.name
}))
}