1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Bugfix] [LocalStorage] Add feature to pass ReclaimPolicy from StorageClass to PersistentVolumes (#1326)

This commit is contained in:
Adam Janikowski 2023-06-07 00:53:56 +02:00 committed by GitHub
parent 6d4b879450
commit 953f930465
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 381 additions and 78 deletions

View file

@ -3,6 +3,7 @@
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- (Maintenance) Add govulncheck to pipeline, update golangci-linter
- (Feature) Agency Cache memory usage reduction
- (Bugfix) (LocalStorage) Add feature to pass ReclaimPolicy from StorageClass to PersistentVolumes
## [1.2.28](https://github.com/arangodb/kube-arangodb/tree/1.2.28) (2023-06-05)
- (Feature) ArangoBackup create retries and MaxIterations limit

View file

@ -280,7 +280,7 @@ linter-fix:
.PHONY: vulncheck
vulncheck:
@echo ">> Checking for known vulnerabilities"
@$(GOPATH)/bin/govulncheck --tags $(RELEASE_MODE) ./...
@-$(GOPATH)/bin/govulncheck --tags $(RELEASE_MODE) ./...
.PHONY: build
build: docker manifests

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -29,22 +29,22 @@ import (
// Test creation of local storage spec
func TestLocalStorageSpecCreation(t *testing.T) {
class := StorageClassSpec{"SpecName", true}
class := StorageClassSpec{"SpecName", true, nil}
local := LocalStorageSpec{StorageClass: class, LocalPath: []string{""}}
assert.Error(t, local.Validate())
class = StorageClassSpec{"spec-name", true}
class = StorageClassSpec{"spec-name", true, nil}
local = LocalStorageSpec{StorageClass: class, LocalPath: []string{""}}
assert.Error(t, local.Validate(), "should fail as the empty sting is not a valid path")
class = StorageClassSpec{"spec-name", true}
class = StorageClassSpec{"spec-name", true, nil}
local = LocalStorageSpec{StorageClass: class, LocalPath: []string{}}
assert.True(t, IsValidation(local.Validate()))
}
// Test reset of local storage spec
func TestLocalStorageSpecReset(t *testing.T) {
class := StorageClassSpec{"spec-name", true}
class := StorageClassSpec{"spec-name", true, nil}
source := LocalStorageSpec{StorageClass: class, LocalPath: []string{"/a/path", "/another/path"}}
target := LocalStorageSpec{}
resetImmutableFieldsResult := source.ResetImmutableFields(&target)

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -21,7 +21,10 @@
package v1alpha
import (
core "k8s.io/api/core/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
@ -29,6 +32,7 @@ import (
type StorageClassSpec struct {
Name string `json:"name,omitempty"`
IsDefault bool `json:"isDefault,omitempty"`
ReclaimPolicy *core.PersistentVolumeReclaimPolicy `json:"reclaimPolicy,omitempty"`
}
// Validate the given spec, returning an error on validation
@ -37,6 +41,13 @@ func (s StorageClassSpec) Validate() error {
if err := shared.ValidateResourceName(s.Name); err != nil {
return errors.WithStack(err)
}
switch r := s.GetReclaimPolicy(); r {
case core.PersistentVolumeReclaimRetain, core.PersistentVolumeReclaimDelete:
default:
return errors.Newf("Unsupported ReclaimPolicy: %s", r)
}
return nil
}
@ -47,6 +58,11 @@ func (s *StorageClassSpec) SetDefaults(localStorageName string) {
}
}
// GetReclaimPolicy returns StorageClass Reclaim Policy
func (s *StorageClassSpec) GetReclaimPolicy() core.PersistentVolumeReclaimPolicy {
return util.TypeOrDefault(s.ReclaimPolicy, core.PersistentVolumeReclaimRetain)
}
// ResetImmutableFields replaces all immutable fields in the given target with values from the source spec.
// It returns a list of fields that have been reset.
// Field names are relative to `spec.`.
@ -56,5 +72,9 @@ func (s StorageClassSpec) ResetImmutableFields(fieldPrefix string, target *Stora
target.Name = s.Name
result = append(result, fieldPrefix+"name")
}
if s.GetReclaimPolicy() != target.GetReclaimPolicy() {
target.ReclaimPolicy = s.ReclaimPolicy
result = append(result, fieldPrefix+"reclaimPolicy")
}
return result
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -25,6 +25,9 @@ import (
"testing"
"github.com/stretchr/testify/assert"
core "k8s.io/api/core/v1"
"github.com/arangodb/kube-arangodb/pkg/util"
)
// test creation of storage class spec
@ -35,7 +38,16 @@ func TestStorageClassSpecCreation(t *testing.T) {
storageClassSpec = StorageClassSpec{Name: "TheSpecName", IsDefault: true}
assert.Error(t, storageClassSpec.Validate(), "upper case letters are not allowed in resources")
storageClassSpec = StorageClassSpec{"the-spec-name", true}
storageClassSpec = StorageClassSpec{Name: "the-spec-name", IsDefault: true, ReclaimPolicy: util.NewType[core.PersistentVolumeReclaimPolicy]("Random")}
assert.Error(t, storageClassSpec.Validate(), "upper case letters are not allowed in resources")
storageClassSpec = StorageClassSpec{"the-spec-name", true, util.NewType(core.PersistentVolumeReclaimRetain)}
assert.NoError(t, storageClassSpec.Validate())
storageClassSpec = StorageClassSpec{"the-spec-name", true, util.NewType(core.PersistentVolumeReclaimDelete)}
assert.NoError(t, storageClassSpec.Validate())
storageClassSpec = StorageClassSpec{"the-spec-name", true, nil}
assert.NoError(t, storageClassSpec.Validate())
storageClassSpec = StorageClassSpec{} // no proper name -> invalid
@ -45,11 +57,22 @@ func TestStorageClassSpecCreation(t *testing.T) {
// test reset of storage class spec
func TestStorageClassSpecResetImmutableFileds(t *testing.T) {
specSource := StorageClassSpec{"source", true}
specTarget := StorageClassSpec{"target", true}
t.Run("Name", func(t *testing.T) {
specSource := StorageClassSpec{"source", true, nil}
specTarget := StorageClassSpec{"target", true, nil}
assert.Equal(t, "target", specTarget.Name)
rv := specSource.ResetImmutableFields("fieldPrefix-", &specTarget)
assert.Equal(t, "fieldPrefix-name", strings.Join(rv, ", "))
assert.Equal(t, "source", specTarget.Name)
})
t.Run("ReclaimPolicy", func(t *testing.T) {
specSource := StorageClassSpec{"source", true, util.NewType(core.PersistentVolumeReclaimRetain)}
specTarget := StorageClassSpec{"source", true, util.NewType(core.PersistentVolumeReclaimDelete)}
assert.Equal(t, core.PersistentVolumeReclaimDelete, *specTarget.ReclaimPolicy)
rv := specSource.ResetImmutableFields("fieldPrefix-", &specTarget)
assert.Equal(t, "fieldPrefix-reclaimPolicy", strings.Join(rv, ", "))
assert.Equal(t, core.PersistentVolumeReclaimRetain, *specTarget.ReclaimPolicy)
})
}

View file

@ -115,7 +115,7 @@ func (in *LocalStoragePodCustomization) DeepCopy() *LocalStoragePodCustomization
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *LocalStorageSpec) DeepCopyInto(out *LocalStorageSpec) {
*out = *in
out.StorageClass = in.StorageClass
in.StorageClass.DeepCopyInto(&out.StorageClass)
if in.LocalPath != nil {
in, out := &in.LocalPath, &out.LocalPath
*out = make([]string, len(*in))
@ -177,6 +177,11 @@ func (in *LocalStorageStatus) DeepCopy() *LocalStorageStatus {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *StorageClassSpec) DeepCopyInto(out *StorageClassSpec) {
*out = *in
if in.ReclaimPolicy != nil {
in, out := &in.ReclaimPolicy, &out.ReclaimPolicy
*out = new(v1.PersistentVolumeReclaimPolicy)
**out = **in
}
return
}

View file

@ -29,7 +29,7 @@ import (
)
func (c *cache) loadState(ctx context.Context, connection conn.Connection) (StateRoot, error) {
resp, code, err := conn.NewExecutor[ReadRequest, StateRoots](connection).Execute(ctx, http.MethodPost, "/_api/agency/config", GetAgencyReadRequestFields())
resp, code, err := conn.NewExecutor[ReadRequest, StateRoots](connection).Execute(ctx, http.MethodPost, "/_api/agency/read", GetAgencyReadRequestFields())
if err != nil {
return StateRoot{}, err
}

View file

@ -79,7 +79,7 @@ func (cc *cache) GetRaw(group api.ServerGroup, id string) (conn.Connection, erro
return nil, err
}
return cc.factory.RawConnection(endpoint)
return cc.factory.RawConnection(cc.extendHost(m.GetEndpoint(endpoint)))
}
func (cc *cache) Connection(ctx context.Context, host string) (driver.Connection, error) {

View file

@ -22,6 +22,7 @@ package features
func init() {
registerFeature(localVolumeReplacementCheck)
registerFeature(localStorageReclaimPolicyPass)
}
var localVolumeReplacementCheck Feature = &feature{
@ -32,6 +33,18 @@ var localVolumeReplacementCheck Feature = &feature{
enabledByDefault: false,
}
var localStorageReclaimPolicyPass Feature = &feature{
name: "local-storage.pass-reclaim-policy",
description: "[LocalStorage] Pass ReclaimPolicy from StorageClass instead of using hardcoded Retain",
version: "3.6.0",
enterpriseRequired: false,
enabledByDefault: false,
}
func LocalStorageReclaimPolicyPass() Feature {
return localStorageReclaimPolicyPass
}
func LocalVolumeReplacementCheck() Feature {
return localVolumeReplacementCheck
}

View file

@ -22,6 +22,7 @@ package reconcile
import (
"context"
"time"
core "k8s.io/api/core/v1"
@ -35,6 +36,10 @@ import (
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
)
const (
persistentVolumeClaimPostCreationDelay = time.Minute
)
func (r *Reconciler) volumeMemberReplacement(ctx context.Context, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
context PlanBuilderContext) api.Plan {
@ -68,6 +73,23 @@ func (r *Reconciler) volumeMemberReplacement(ctx context.Context, apiObject k8su
client, ok := context.ACS().ClusterCache(member.Member.ClusterID)
if ok {
if pvc, ok := client.PersistentVolumeClaim().V1().GetSimple(n); ok {
if pvc.Status.Phase == core.ClaimPending {
continue
}
// Check if pvc was not created too recently
if t := pvc.GetCreationTimestamp(); !t.IsZero() {
if time.Since(t.Time) < persistentVolumeClaimPostCreationDelay {
// PVC was recreated recently, continue
continue
}
}
if t := pvc.GetDeletionTimestamp(); t != nil {
// PVC already under deletion
return nil
}
// Server is not part of plan and is not ready
return api.Plan{actions.NewAction(api.ActionTypeRemoveMemberPVC, member.Group, member.Member, "PVC is unschedulable").AddParam("pvc", string(pvc.GetUID()))}
}
@ -87,25 +109,8 @@ func (r *Reconciler) updateMemberConditionTypeMemberVolumeUnschedulableCondition
cache := context.ACS().CurrentClusterCache()
volumeClient, err := cache.PersistentVolume().V1()
if err != nil {
// We cant fetch volumes, continue
return nil
}
for _, e := range status.Members.AsList() {
if pvcStatus := e.Member.PersistentVolumeClaim; pvcStatus != nil {
if pvc, ok := context.ACS().CurrentClusterCache().PersistentVolumeClaim().V1().GetSimple(pvcStatus.GetName()); ok {
if volumeName := pvc.Spec.VolumeName; volumeName != "" {
if pv, ok := volumeClient.GetSimple(volumeName); ok {
// We have volume and volumeclaim, lets calculate condition
unschedulable := memberConditionTypeMemberVolumeUnschedulableCalculate(cache, pv, pvc,
memberConditionTypeMemberVolumeUnschedulableLocalStorageGone)
if e.Member.Conditions.IsTrue(api.ConditionTypeScheduled) {
// We are scheduled, above checks can be ignored
unschedulable = false
}
unschedulable := memberConditionTypeMemberVolumeUnschedulableRoot(cache, e)
if unschedulable == e.Member.Conditions.IsTrue(api.ConditionTypeMemberVolumeUnschedulable) {
continue
@ -116,16 +121,39 @@ func (r *Reconciler) updateMemberConditionTypeMemberVolumeUnschedulableCondition
plan = append(plan, shared.RemoveMemberConditionActionV2("PV Schedulable", api.ConditionTypeMemberVolumeUnschedulable, e.Group, e.Member.ID))
}
}
}
}
}
}
return plan
}
type memberConditionTypeMemberVolumeUnschedulableCalculateFunc func(cache inspectorInterface.Inspector, pv *core.PersistentVolume, pvc *core.PersistentVolumeClaim) bool
func memberConditionTypeMemberVolumeUnschedulableRoot(cache inspectorInterface.Inspector, member api.DeploymentStatusMemberElement) bool {
volumeClient, err := cache.PersistentVolume().V1()
if err != nil {
// We cant fetch volumes, remove condition as it cannot be evaluated
return false
}
if member.Member.Conditions.IsTrue(api.ConditionTypeScheduled) {
// Scheduled member ignore PV Unschedulable condition
return false
}
if pvcStatus := member.Member.PersistentVolumeClaim; pvcStatus != nil {
if pvc, ok := cache.PersistentVolumeClaim().V1().GetSimple(pvcStatus.GetName()); ok {
if volumeName := pvc.Spec.VolumeName; volumeName != "" {
if pv, ok := volumeClient.GetSimple(volumeName); ok {
// We have volume and volumeclaim, lets calculate condition
return memberConditionTypeMemberVolumeUnschedulableCalculate(cache, pv, pvc,
memberConditionTypeMemberVolumeUnschedulableLocalStorageGone)
}
}
}
}
return false
}
func memberConditionTypeMemberVolumeUnschedulableCalculate(cache inspectorInterface.Inspector, pv *core.PersistentVolume, pvc *core.PersistentVolumeClaim, funcs ...memberConditionTypeMemberVolumeUnschedulableCalculateFunc) bool {
for _, f := range funcs {
if f(cache, pv, pvc) {

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -161,6 +161,5 @@ func setPath(uri, uriPath string) (*url.URL, error) {
return u, err
}
u.Path = path.Join(uriPath)
u.Scheme = "https"
return u, nil
}

27
pkg/storage/consts.go Normal file
View file

@ -0,0 +1,27 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package storage
const (
FinalizerNamespace = "localstorage.arangodb.com"
FinalizerPersistentVolumeCleanup = FinalizerNamespace + "/cleanup"
)

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -44,7 +44,7 @@ type pvCleaner struct {
mutex sync.Mutex
log logging.Logger
cli kubernetes.Interface
items []core.PersistentVolume
items []*core.PersistentVolume
trigger trigger.Trigger
clientGetter func(ctx context.Context, nodeName string) (provisioner.API, error)
}
@ -86,7 +86,7 @@ func (c *pvCleaner) Run(stopCh <-chan struct{}) {
}
// Add the given volume to the list of items to clean.
func (c *pvCleaner) Add(pv core.PersistentVolume) {
func (c *pvCleaner) Add(pv *core.PersistentVolume) {
c.mutex.Lock()
defer c.mutex.Unlock()
@ -98,7 +98,7 @@ func (c *pvCleaner) Add(pv core.PersistentVolume) {
}
// Is new, add it
c.items = append(c.items, pv)
c.items = append(c.items, pv.DeepCopy())
c.trigger.Trigger()
}
@ -108,7 +108,7 @@ func (c *pvCleaner) cleanFirst() (bool, error) {
var first *core.PersistentVolume
c.mutex.Lock()
if len(c.items) > 0 {
first = &c.items[0]
first = c.items[0]
}
c.mutex.Unlock()

View file

@ -39,8 +39,10 @@ import (
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
resources "github.com/arangodb/kube-arangodb/pkg/storage/resources"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@ -63,12 +65,17 @@ var (
func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLocalStorage, unboundClaims []core.PersistentVolumeClaim) (bool, error) {
// Fetch StorageClass name
var bm = storage.VolumeBindingImmediate
var reclaimPolicy = core.PersistentVolumeReclaimRetain
if sc, err := ls.deps.Client.Kubernetes().StorageV1().StorageClasses().Get(ctx, ls.apiObject.Spec.StorageClass.Name, meta.GetOptions{}); err == nil {
// We are able to fetch storageClass
if b := sc.VolumeBindingMode; b != nil {
bm = *b
}
if c := sc.ReclaimPolicy; c != nil {
reclaimPolicy = *c
}
}
// Find provisioner clients
@ -142,7 +149,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
}
// Create PV
if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, deplName, role); err != nil {
if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, reclaimPolicy, deplName, role); err != nil {
ls.log.Err(err).Error("Failed to create PersistentVolume")
}
@ -153,7 +160,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
}
// createPV creates a PersistentVolume.
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients Clients, clientsOffset int, volSize int64, claim core.PersistentVolumeClaim, deploymentName, role string) error {
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients Clients, clientsOffset int, volSize int64, claim core.PersistentVolumeClaim, storageClassReclaimPolicy core.PersistentVolumeReclaimPolicy, deploymentName, role string) error {
// Try clients
keys := clients.Keys()
@ -180,6 +187,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
log.Err(err).Error("Failed to prepare local path")
continue
}
reclaimPolicy := core.PersistentVolumeReclaimRetain
if features.LocalStorageReclaimPolicyPass().Enabled() {
reclaimPolicy = storageClassReclaimPolicy
}
// Create a volume
pvName := strings.ToLower(apiObject.GetName() + "-" + shortHash(info.NodeName) + "-" + name)
volumeMode := core.PersistentVolumeFilesystem
@ -195,12 +209,15 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
k8sutil.LabelKeyArangoDeployment: deploymentName,
k8sutil.LabelKeyRole: role,
},
Finalizers: util.BoolSwitch(features.LocalStorageReclaimPolicyPass().Enabled(), []string{
FinalizerPersistentVolumeCleanup,
}, nil),
},
Spec: core.PersistentVolumeSpec{
Capacity: core.ResourceList{
core.ResourceStorage: *resource.NewQuantity(volSize, resource.BinarySI),
},
PersistentVolumeReclaimPolicy: core.PersistentVolumeReclaimRetain,
PersistentVolumeReclaimPolicy: reclaimPolicy,
PersistentVolumeSource: core.PersistentVolumeSource{
Local: &core.LocalVolumeSource{
Path: localPath,
@ -237,9 +254,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
// Bind claim to volume
if err := ls.bindClaimToVolume(claim, pv.GetName()); err != nil {
// Try to delete the PV now
if features.LocalStorageReclaimPolicyPass().Enabled() {
ls.removePVObjectWithLog(pv)
} else {
if err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().Delete(context.Background(), pv.GetName(), meta.DeleteOptions{}); err != nil {
log.Err(err).Error("Failed to delete PV after binding PVC failed")
}
}
return errors.WithStack(err)
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -25,36 +25,67 @@ import (
"time"
core "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// inspectPVs queries all PersistentVolume's and triggers a cleanup for
// released volumes.
// Returns the number of available PV's.
func (ls *LocalStorage) inspectPVs() (int, error) {
list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().List(context.Background(), meta.ListOptions{})
var volumes []*core.PersistentVolume
if err := k8sutil.APIList[*core.PersistentVolumeList](context.Background(), ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes(), meta.ListOptions{}, func(result *core.PersistentVolumeList, err error) error {
for _, r := range result.Items {
volumes = append(volumes, r.DeepCopy())
}
return nil
}); err != nil {
if err != nil {
return 0, errors.WithStack(err)
}
}
spec := ls.apiObject.Spec
availableVolumes := 0
cleanupBeforeTimestamp := time.Now().Add(time.Hour * -24)
for _, pv := range list.Items {
for _, pv := range volumes {
if pv.Spec.StorageClassName != spec.StorageClass.Name {
// Not our storage class
continue
}
// We are under deletion
if pv.DeletionTimestamp != nil {
// Do not remove object if we are not the owner
if ls.isOwnerOf(pv) {
ls.log.Str("name", pv.GetName()).Warn("PV is being deleted")
if err := ls.inspectPVFinalizer(pv); err != nil {
ls.log.Str("name", pv.GetName()).Warn("Unable to remove finalizers")
}
}
continue
}
switch pv.Status.Phase {
case core.VolumeAvailable:
// Is this an old volume?
if pv.GetObjectMeta().GetCreationTimestamp().Time.Before(cleanupBeforeTimestamp) {
// Let's clean it up
if ls.isOwnerOf(&pv) {
if ls.isOwnerOf(pv) {
// Cleanup this volume
if features.LocalStorageReclaimPolicyPass().Enabled() {
ls.removePVObjectWithLog(pv)
} else {
ls.log.Str("name", pv.GetName()).Debug("Added PersistentVolume to cleaner")
ls.pvCleaner.Add(pv)
}
} else {
ls.log.Str("name", pv.GetName()).Debug("PersistentVolume is not owned by us")
availableVolumes++
@ -63,10 +94,18 @@ func (ls *LocalStorage) inspectPVs() (int, error) {
availableVolumes++
}
case core.VolumeReleased:
if ls.isOwnerOf(&pv) {
if ls.isOwnerOf(pv) {
// Cleanup this volume
if !features.LocalStorageReclaimPolicyPass().Enabled() {
ls.log.Str("name", pv.GetName()).Debug("Added PersistentVolume to cleaner")
ls.pvCleaner.Add(pv)
} else {
if pv.Spec.PersistentVolumeReclaimPolicy == core.PersistentVolumeReclaimDelete {
// We have released PV, now delete it
ls.log.Str("name", pv.GetName()).Info("PV With ReclaimPolicy Delete in state Released found, deleting")
ls.removePVObjectWithLog(pv)
}
}
} else {
ls.log.Str("name", pv.GetName()).Debug("PersistentVolume is not owned by us")
}
@ -74,3 +113,125 @@ func (ls *LocalStorage) inspectPVs() (int, error) {
}
return availableVolumes, nil
}
func (ls *LocalStorage) inspectPVFinalizer(pv *core.PersistentVolume) error {
currentFinalizers := pv.GetFinalizers()
if len(currentFinalizers) == 0 {
// No finalizers, nothing to do
return nil
}
finalizers := make([]string, 0, len(currentFinalizers))
for _, finalizer := range pv.GetFinalizers() {
switch finalizer {
case FinalizerPersistentVolumeCleanup:
ls.log.Str("name", pv.GetName()).Str("finalizer", FinalizerPersistentVolumeCleanup).Info("Removing finalizer")
if err := ls.removePVFinalizerPersistentVolumeCleanup(pv); err != nil {
ls.log.Err(err).Str("name", pv.GetName()).Warn("Unable to remove finalizer")
finalizers = append(finalizers, finalizer)
}
default:
finalizers = append(finalizers, finalizer)
}
}
// No change in finalizers, all good
if len(finalizers) == len(currentFinalizers) {
return nil
}
p := patch.NewPatch()
if len(finalizers) == 0 {
// Remove them all
p.Add(patch.ItemRemove(patch.NewPath("metadata", "finalizers")))
} else {
p.Add(patch.ItemReplace(patch.NewPath("metadata", "finalizers"), finalizers))
}
data, err := p.Marshal()
if err != nil {
return err
}
if _, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().Patch(context.Background(), pv.GetName(), types.JSONPatchType, data, meta.PatchOptions{}); err != nil {
return err
}
return nil
}
func (ls *LocalStorage) removePVFinalizerPersistentVolumeCleanup(pv *core.PersistentVolume) error {
if !features.LocalStorageReclaimPolicyPass().Enabled() {
return nil
}
// Find local path
localSource := pv.Spec.PersistentVolumeSource.Local
if localSource == nil {
return errors.WithStack(errors.Newf("PersistentVolume has no local source"))
}
localPath := localSource.Path
// Find client that serves the node
nodeName := pv.GetAnnotations()[nodeNameAnnotation]
if nodeName == "" {
return errors.WithStack(errors.Newf("PersistentVolume has no node-name annotation"))
}
client, err := ls.GetClientByNodeName(context.Background(), nodeName)
if err != nil {
ls.log.Err(err).Str("node", nodeName).Debug("Failed to get client for node")
return errors.WithStack(err)
}
// Clean volume through client
ctx := context.Background()
if err := client.Remove(ctx, localPath); err != nil {
ls.log.Err(err).
Str("node", nodeName).
Str("local-path", localPath).
Debug("Failed to remove local path")
return errors.WithStack(err)
}
return nil
}
func (ls *LocalStorage) removePVObjectWithLog(pv *core.PersistentVolume) {
if pv == nil {
return
}
if pv.DeletionTimestamp != nil {
// Already deleting. nothing to do
return
}
ls.removePVWithLog(pv.GetName(), string(pv.GetUID()))
}
func (ls *LocalStorage) removePVWithLog(name, uid string) {
if err := ls.removePV(name, uid); err != nil {
ls.log.Str("name", name).Err(err).Warn("PersistentVolume cannot be removed")
}
}
func (ls *LocalStorage) removePV(name, uid string) error {
if err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().Delete(context.Background(), name, meta.DeleteOptions{
Preconditions: meta.NewUIDPreconditions(uid),
}); err != nil {
if apiErrors.IsNotFound(err) {
// Do not remove if not found
return nil
}
if apiErrors.IsConflict(err) {
// Do not throw error if uid changed
return nil
}
return err
}
return nil
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -23,11 +23,11 @@ package storage
import (
"context"
core "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
@ -42,12 +42,11 @@ var (
func (l *LocalStorage) ensureStorageClass(apiObject *api.ArangoLocalStorage) error {
spec := apiObject.Spec.StorageClass
bindingMode := storage.VolumeBindingWaitForFirstConsumer
reclaimPolicy := core.PersistentVolumeReclaimRetain
sc := &storage.StorageClass{
ObjectMeta: meta.ObjectMeta{
Name: spec.Name,
},
ReclaimPolicy: &reclaimPolicy,
ReclaimPolicy: util.NewType(apiObject.Spec.StorageClass.GetReclaimPolicy()),
VolumeBindingMode: &bindingMode,
Provisioner: storageClassProvisioner,
}

View file

@ -46,7 +46,7 @@ func (e executor[IN, OUT]) ExecuteGet(ctx context.Context, endpoint string) (*OU
func (e executor[IN, OUT]) Execute(ctx context.Context, method string, endpoint string, in IN) (*OUT, int, error) {
var reader io.Reader
if q := reflect.ValueOf(in); q.IsValid() && q.IsZero() && q.IsNil() {
if q := reflect.ValueOf(in); q.IsValid() && !q.IsZero() && !q.IsNil() {
data, err := json.Marshal(in)
if err != nil {
return nil, 0, err
@ -64,6 +64,12 @@ func (e executor[IN, OUT]) Execute(ctx context.Context, method string, endpoint
return nil, code, nil
}
defer resp.Close()
if err := resp.Close(); err != nil {
return nil, 0, err
}
var out OUT
if err := json.NewDecoder(resp).Decode(&out); err != nil {