1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Bugfix] Fix LocalStorage WaitForFirstConsumer mode (#1219)

This commit is contained in:
Adam Janikowski 2022-12-31 14:04:26 +01:00 committed by GitHub
parent a9f82d28b3
commit 46b519a1d1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 716 additions and 103 deletions

View file

@ -7,6 +7,7 @@
- (Feature) Add tolerations runtime rotation - (Feature) Add tolerations runtime rotation
- (Feature) Promote Version Check Feature - (Feature) Promote Version Check Feature
- (Bugfix) Ensure PDBs Consistency - (Bugfix) Ensure PDBs Consistency
- (Bugfix) Fix LocalStorage WaitForFirstConsumer mode
## [1.2.22](https://github.com/arangodb/kube-arangodb/tree/1.2.22) (2022-12-13) ## [1.2.22](https://github.com/arangodb/kube-arangodb/tree/1.2.22) (2022-12-13)
- (Bugfix) Do not manage ports in managed ExternalAccess mode - (Bugfix) Do not manage ports in managed ExternalAccess mode

View file

@ -20,7 +20,7 @@ rules:
verbs: ["get", "list", "watch"] verbs: ["get", "list", "watch"]
- apiGroups: [""] - apiGroups: [""]
resources: ["namespaces", "nodes"] resources: ["namespaces", "nodes"]
verbs: ["get", "list"] verbs: ["get", "list", "watch"]
- apiGroups: ["storage.k8s.io"] - apiGroups: ["storage.k8s.io"]
resources: ["storageclasses"] resources: ["storageclasses"]
verbs: ["*"] verbs: ["*"]

View file

@ -15,7 +15,7 @@ metadata:
rules: rules:
- apiGroups: [""] - apiGroups: [""]
resources: ["pods"] resources: ["pods"]
verbs: ["get", "update"] verbs: ["get", "update", "watch", "list"]
- apiGroups: [""] - apiGroups: [""]
resources: ["secrets"] resources: ["secrets"]
verbs: ["get"] verbs: ["get"]

View file

@ -23,6 +23,8 @@ package v1alpha
import ( import (
"strings" "strings"
core "k8s.io/api/core/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/errors"
) )
@ -32,6 +34,7 @@ type LocalStorageSpec struct {
StorageClass StorageClassSpec `json:"storageClass"` StorageClass StorageClassSpec `json:"storageClass"`
LocalPath []string `json:"localPath,omitempty"` LocalPath []string `json:"localPath,omitempty"`
Tolerations []core.Toleration `json:"tolerations,omitempty"`
NodeSelector map[string]string `json:"nodeSelector,omitempty"` NodeSelector map[string]string `json:"nodeSelector,omitempty"`
Privileged *bool `json:"privileged,omitempty"` Privileged *bool `json:"privileged,omitempty"`

View file

@ -26,6 +26,7 @@
package v1alpha package v1alpha
import ( import (
v1 "k8s.io/api/core/v1"
runtime "k8s.io/apimachinery/pkg/runtime" runtime "k8s.io/apimachinery/pkg/runtime"
) )
@ -120,6 +121,13 @@ func (in *LocalStorageSpec) DeepCopyInto(out *LocalStorageSpec) {
*out = make([]string, len(*in)) *out = make([]string, len(*in))
copy(*out, *in) copy(*out, *in)
} }
if in.Tolerations != nil {
in, out := &in.Tolerations, &out.Tolerations
*out = make([]v1.Toleration, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.NodeSelector != nil { if in.NodeSelector != nil {
in, out := &in.NodeSelector, &out.NodeSelector in, out := &in.NodeSelector, &out.NodeSelector
*out = make(map[string]string, len(*in)) *out = make(map[string]string, len(*in))

View file

@ -53,14 +53,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{
// AddToScheme adds all types of this clientset into the given scheme. This allows composition // AddToScheme adds all types of this clientset into the given scheme. This allows composition
// of clientsets, like in: // of clientsets, like in:
// //
// import ( // import (
// "k8s.io/client-go/kubernetes" // "k8s.io/client-go/kubernetes"
// clientsetscheme "k8s.io/client-go/kubernetes/scheme" // clientsetscheme "k8s.io/client-go/kubernetes/scheme"
// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" // aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme"
// ) // )
// //
// kclientset, _ := kubernetes.NewForConfig(c) // kclientset, _ := kubernetes.NewForConfig(c)
// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme)
// //
// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types
// correctly. // correctly.

View file

@ -53,14 +53,14 @@ var localSchemeBuilder = runtime.SchemeBuilder{
// AddToScheme adds all types of this clientset into the given scheme. This allows composition // AddToScheme adds all types of this clientset into the given scheme. This allows composition
// of clientsets, like in: // of clientsets, like in:
// //
// import ( // import (
// "k8s.io/client-go/kubernetes" // "k8s.io/client-go/kubernetes"
// clientsetscheme "k8s.io/client-go/kubernetes/scheme" // clientsetscheme "k8s.io/client-go/kubernetes/scheme"
// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme" // aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme"
// ) // )
// //
// kclientset, _ := kubernetes.NewForConfig(c) // kclientset, _ := kubernetes.NewForConfig(c)
// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme) // _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme)
// //
// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types // After this, RawExtensions in Kubernetes types will serialize kube-aggregator types
// correctly. // correctly.

View file

@ -23,6 +23,7 @@ package storage
import ( import (
"context" "context"
"fmt" "fmt"
"time"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner" "github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner/client" "github.com/arangodb/kube-arangodb/pkg/storage/provisioner/client"
@ -30,9 +31,43 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
) )
type Clients map[string]provisioner.API
func (c Clients) Copy() Clients {
r := make(Clients, len(c))
for k, v := range c {
r[k] = v
}
return r
}
func (c Clients) Filter(f func(node string, client provisioner.API) bool) Clients {
r := make(Clients, len(c))
for k, v := range c {
if f(k, v) {
r[k] = v
}
}
return r
}
func (c Clients) Keys() []string {
r := make([]string, 0, len(c))
for k := range c {
r = append(r, k)
}
return r
}
// createProvisionerClients creates a list of clients for all known // createProvisionerClients creates a list of clients for all known
// provisioners. // provisioners.
func (ls *LocalStorage) createProvisionerClients() ([]provisioner.API, error) { func (ls *LocalStorage) createProvisionerClients(ctx context.Context) (Clients, error) {
// Find provisioner endpoints // Find provisioner endpoints
ns := ls.apiObject.GetNamespace() ns := ls.apiObject.GetNamespace()
listOptions := k8sutil.LocalStorageListOpt(ls.apiObject.GetName(), roleProvisioner) listOptions := k8sutil.LocalStorageListOpt(ls.apiObject.GetName(), roleProvisioner)
@ -46,31 +81,38 @@ func (ls *LocalStorage) createProvisionerClients() ([]provisioner.API, error) {
return nil, nil return nil, nil
} }
// Create clients for endpoints // Create clients for endpoints
clients := make([]provisioner.API, len(addrs)) clients := make(map[string]provisioner.API, len(addrs))
for i, addr := range addrs { for _, addr := range addrs {
var err error c, err := client.New(fmt.Sprintf("http://%s", addr))
clients[i], err = client.New(fmt.Sprintf("http://%s", addr))
if err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
if info, err := ls.fetchClientNodeInfo(ctx, c); err == nil {
clients[info.NodeName] = c
}
} }
return clients, nil return clients, nil
} }
func (ls *LocalStorage) fetchClientNodeInfo(ctx context.Context, c provisioner.API) (provisioner.NodeInfo, error) {
nctx, cancel := context.WithTimeout(ctx, 3*time.Second)
defer cancel()
return c.GetNodeInfo(nctx)
}
// GetClientByNodeName looks for a client that serves the given node name. // GetClientByNodeName looks for a client that serves the given node name.
// Returns an error if no such client is found. // Returns an error if no such client is found.
func (ls *LocalStorage) GetClientByNodeName(nodeName string) (provisioner.API, error) { func (ls *LocalStorage) GetClientByNodeName(ctx context.Context, nodeName string) (provisioner.API, error) {
clients, err := ls.createProvisionerClients() clients, err := ls.createProvisionerClients(ctx)
if err != nil { if err != nil {
return nil, errors.WithStack(err) return nil, errors.WithStack(err)
} }
// Find matching client // Find matching client
for _, c := range clients { if c, ok := clients[nodeName]; ok {
ctx := context.Background() return c, nil
if info, err := c.GetNodeInfo(ctx); err == nil && info.NodeName == nodeName {
return c, nil
}
} }
return nil, errors.WithStack(errors.Newf("No client found for node name '%s'", nodeName)) return nil, errors.WithStack(errors.Newf("No client found for node name '%s'", nodeName))
} }

View file

@ -94,6 +94,7 @@ func (ls *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error
NodeSelector: apiObject.Spec.NodeSelector, NodeSelector: apiObject.Spec.NodeSelector,
ImagePullSecrets: ls.imagePullSecrets, ImagePullSecrets: ls.imagePullSecrets,
Priority: apiObject.Spec.PodCustomization.GetPriority(), Priority: apiObject.Spec.PodCustomization.GetPriority(),
Tolerations: apiObject.Spec.Tolerations,
}, },
}, },
} }
@ -153,6 +154,7 @@ func (ls *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error
// Update it // Update it
current.Spec = dsSpec current.Spec = dsSpec
if _, err := ls.deps.Client.Kubernetes().AppsV1().DaemonSets(ns).Update(context.Background(), current, meta.UpdateOptions{}); kerrors.IsConflict(err) && attempt < 10 { if _, err := ls.deps.Client.Kubernetes().AppsV1().DaemonSets(ns).Update(context.Background(), current, meta.UpdateOptions{}); kerrors.IsConflict(err) && attempt < 10 {
ls.log.Err(err).Debug("failed to patch DaemonSet spec")
// Failed to update, try again // Failed to update, try again
continue continue
} else if err != nil { } else if err != nil {

View file

@ -259,9 +259,11 @@ func (ls *LocalStorage) run() {
} }
if createNow { if createNow {
ctx := context.Background() ctx := context.Background()
if err := ls.createPVs(ctx, ls.apiObject, unboundPVCs); err != nil { if retry, err := ls.createPVs(ctx, ls.apiObject, unboundPVCs); err != nil {
hasError = true hasError = true
ls.createEvent(k8sutil.NewErrorEvent("PV creation failed", err, ls.apiObject)) ls.createEvent(k8sutil.NewErrorEvent("PV creation failed", err, ls.apiObject))
} else if retry {
inspectionInterval = minInspectionInterval
} }
} }
} }

View file

@ -46,11 +46,11 @@ type pvCleaner struct {
cli kubernetes.Interface cli kubernetes.Interface
items []core.PersistentVolume items []core.PersistentVolume
trigger trigger.Trigger trigger trigger.Trigger
clientGetter func(nodeName string) (provisioner.API, error) clientGetter func(ctx context.Context, nodeName string) (provisioner.API, error)
} }
// newPVCleaner creates a new cleaner of persistent volumes. // newPVCleaner creates a new cleaner of persistent volumes.
func newPVCleaner(cli kubernetes.Interface, clientGetter func(nodeName string) (provisioner.API, error)) *pvCleaner { func newPVCleaner(cli kubernetes.Interface, clientGetter func(ctx context.Context, nodeName string) (provisioner.API, error)) *pvCleaner {
c := &pvCleaner{ c := &pvCleaner{
cli: cli, cli: cli,
clientGetter: clientGetter, clientGetter: clientGetter,
@ -148,7 +148,7 @@ func (c *pvCleaner) clean(pv core.PersistentVolume) error {
if nodeName == "" { if nodeName == "" {
return errors.WithStack(errors.Newf("PersistentVolume has no node-name annotation")) return errors.WithStack(errors.Newf("PersistentVolume has no node-name annotation"))
} }
client, err := c.clientGetter(nodeName) client, err := c.clientGetter(context.Background(), nodeName)
if err != nil { if err != nil {
log.Err(err).Str("node", nodeName).Debug("Failed to get client for node") log.Err(err).Str("node", nodeName).Debug("Failed to get client for node")
return errors.WithStack(err) return errors.WithStack(err)

View file

@ -24,7 +24,6 @@ import (
"context" "context"
"crypto/sha1" "crypto/sha1"
"fmt" "fmt"
"math/rand"
"net" "net"
"path/filepath" "path/filepath"
"sort" "sort"
@ -34,11 +33,13 @@ import (
"github.com/dchest/uniuri" "github.com/dchest/uniuri"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha" api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner" "github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
resources "github.com/arangodb/kube-arangodb/pkg/storage/resources"
"github.com/arangodb/kube-arangodb/pkg/util/constants" "github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@ -58,33 +59,35 @@ var (
) )
// createPVs creates a given number of PersistentVolume's. // createPVs creates a given number of PersistentVolume's.
func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLocalStorage, unboundClaims []core.PersistentVolumeClaim) error { func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLocalStorage, unboundClaims []core.PersistentVolumeClaim) (bool, error) {
// Fetch StorageClass name
var bm = storage.VolumeBindingImmediate
if sc, err := ls.deps.Client.Kubernetes().StorageV1().StorageClasses().Get(ctx, ls.apiObject.Spec.StorageClass.Name, meta.GetOptions{}); err == nil {
// We are able to fetch storageClass
if b := sc.VolumeBindingMode; b != nil {
bm = *b
}
}
// Find provisioner clients // Find provisioner clients
clients, err := ls.createProvisionerClients() clients, err := ls.createProvisionerClients(ctx)
if err != nil { if err != nil {
return errors.WithStack(err) return false, errors.WithStack(err)
} }
if len(clients) == 0 { if len(clients) == 0 {
// No provisioners available // No provisioners available
return errors.WithStack(errors.Newf("No ready provisioner endpoints found")) return false, errors.WithStack(errors.Newf("No ready provisioner endpoints found"))
} }
// Randomize list
rand.Shuffle(len(clients), func(i, j int) {
clients[i], clients[j] = clients[j], clients[i]
})
var nodeClientMap map[string]provisioner.API
for i, claim := range unboundClaims { for i, claim := range unboundClaims {
// Find deployment name & role in the claim (if any) // Find deployment name & role in the claim (if any)
deplName, role, enforceAniAffinity := getDeploymentInfo(claim) deplName, role, enforceAniAffinity := getDeploymentInfo(claim)
allowedClients := clients allowedClients := clients
if deplName != "" { if deplName != "" {
// Select nodes to choose from such that no volume in group lands on the same node // Select nodes to choose from such that no volume in group lands on the same node
if nodeClientMap == nil {
nodeClientMap = createNodeClientMap(ctx, clients)
}
var err error var err error
allowedClients, err = ls.filterAllowedNodes(nodeClientMap, deplName, role) allowedClients, err = ls.filterAllowedNodes(clients, deplName, role)
if err != nil { if err != nil {
ls.log.Err(err).Warn("Failed to filter allowed nodes") ls.log.Err(err).Warn("Failed to filter allowed nodes")
continue // We'll try this claim again later continue // We'll try this claim again later
@ -103,20 +106,58 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
volSize = v volSize = v
} }
} }
if bm == storage.VolumeBindingWaitForFirstConsumer {
podList, err := resources.ListPods(ctx, ls.deps.Client.Kubernetes().CoreV1().Pods(claim.GetNamespace()))
if err != nil {
ls.log.Err(err).Warn("Unable to list pods")
continue
}
podList = podList.FilterByPVCName(claim.GetName())
nodeList, err := resources.ListNodes(ctx, ls.deps.Client.Kubernetes().CoreV1().Nodes())
if err != nil {
ls.log.Err(err).Warn("Unable to list nodes")
continue
}
nodeList = nodeList.FilterSchedulable().FilterPodsTaints(podList)
allowedClients = allowedClients.Filter(func(node string, client provisioner.API) bool {
for _, n := range nodeList {
if n.GetName() == node {
return true
}
}
return false
})
}
if len(allowedClients) == 0 {
ls.log.Info("PVC Cannot be created on any node")
continue
}
// Create PV // Create PV
if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, deplName, role); err != nil { if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, deplName, role); err != nil {
ls.log.Err(err).Error("Failed to create PersistentVolume") ls.log.Err(err).Error("Failed to create PersistentVolume")
} }
return true, nil
} }
return nil return false, nil
} }
// createPV creates a PersistentVolume. // createPV creates a PersistentVolume.
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64, claim core.PersistentVolumeClaim, deploymentName, role string) error { func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients Clients, clientsOffset int, volSize int64, claim core.PersistentVolumeClaim, deploymentName, role string) error {
// Try clients // Try clients
for clientIdx := 0; clientIdx < len(clients); clientIdx++ { keys := clients.Keys()
client := clients[(clientsOffset+clientIdx)%len(clients)]
for clientIdx := 0; clientIdx < len(keys); clientIdx++ {
client := clients[keys[(clientsOffset+clientIdx)%len(keys)]]
// Try local path within client // Try local path within client
for _, localPathRoot := range apiObject.Spec.LocalPath { for _, localPathRoot := range apiObject.Spec.LocalPath {
@ -240,19 +281,6 @@ func createNodeSelector(nodeName string) *core.NodeSelector {
} }
} }
// createNodeClientMap creates a map from node name to API.
// Clients that do not respond properly on a GetNodeInfo request are
// ignored.
func createNodeClientMap(ctx context.Context, clients []provisioner.API) map[string]provisioner.API {
result := make(map[string]provisioner.API)
for _, c := range clients {
if info, err := c.GetNodeInfo(ctx); err == nil {
result[info.NodeName] = c
}
}
return result
}
// getDeploymentInfo returns the name of the deployment that created the given claim, // getDeploymentInfo returns the name of the deployment that created the given claim,
// the role of the server that the claim is used for and the value for `enforceAntiAffinity`. // the role of the server that the claim is used for and the value for `enforceAntiAffinity`.
// If not found, empty strings are returned. // If not found, empty strings are returned.
@ -265,7 +293,7 @@ func getDeploymentInfo(pvc core.PersistentVolumeClaim) (string, string, bool) {
} }
// filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role. // filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role.
func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, deploymentName, role string) ([]provisioner.API, error) { func (ls *LocalStorage) filterAllowedNodes(clients Clients, deploymentName, role string) (Clients, error) {
// Find all PVs for given deployment & role // Find all PVs for given deployment & role
list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().List(context.Background(), meta.ListOptions{ list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().List(context.Background(), meta.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", k8sutil.LabelKeyArangoDeployment, deploymentName, k8sutil.LabelKeyRole, role), LabelSelector: fmt.Sprintf("%s=%s,%s=%s", k8sutil.LabelKeyArangoDeployment, deploymentName, k8sutil.LabelKeyRole, role),
@ -278,13 +306,11 @@ func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, d
nodeName := pv.GetAnnotations()[nodeNameAnnotation] nodeName := pv.GetAnnotations()[nodeNameAnnotation]
excludedNodes[nodeName] = struct{}{} excludedNodes[nodeName] = struct{}{}
} }
result := make([]provisioner.API, 0, len(clients))
for nodeName, c := range clients { return clients.Filter(func(node string, client provisioner.API) bool {
if _, found := excludedNodes[nodeName]; !found { _, ok := excludedNodes[node]
result = append(result, c) return !ok
} }), nil
}
return result, nil
} }
// bindClaimToVolume tries to bind the given claim to the volume with given name. // bindClaimToVolume tries to bind the given claim to the volume with given name.

View file

@ -21,16 +21,12 @@
package storage package storage
import ( import (
"context"
"encoding/json" "encoding/json"
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
core "k8s.io/api/core/v1" core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner/mocks"
) )
// TestCreateValidEndpointList tests createValidEndpointList. // TestCreateValidEndpointList tests createValidEndpointList.
@ -96,34 +92,6 @@ func TestCreateNodeSelector(t *testing.T) {
} }
} }
// TestCreateNodeClientMap tests createNodeClientMap.
func TestCreateNodeClientMap(t *testing.T) {
GB := int64(1024 * 1024 * 1024)
foo := mocks.NewProvisioner("foo", 100*GB, 200*GB)
bar := mocks.NewProvisioner("bar", 300*GB, 400*GB)
tests := []struct {
Input []provisioner.API
Expected map[string]provisioner.API
}{
{
Input: nil,
Expected: map[string]provisioner.API{},
},
{
Input: []provisioner.API{foo, bar},
Expected: map[string]provisioner.API{
"bar": bar,
"foo": foo,
},
},
}
ctx := context.Background()
for _, test := range tests {
output := createNodeClientMap(ctx, test.Input)
assert.Equal(t, test.Expected, output)
}
}
// TestGetDeploymentInfo tests getDeploymentInfo. // TestGetDeploymentInfo tests getDeploymentInfo.
func TestGetDeploymentInfo(t *testing.T) { func TestGetDeploymentInfo(t *testing.T) {
tests := []struct { tests := []struct {

View file

@ -0,0 +1,122 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package resources
import (
"context"
"math/rand"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
typedCore "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/arangodb/kube-arangodb/pkg/storage/utils"
)
type Nodes []*core.Node
func (p Nodes) Filter(f func(node *core.Node) bool) Nodes {
var r = make(Nodes, 0, len(p))
for _, c := range p {
if f(c) {
r = append(r, c)
}
}
return r
}
func (p Nodes) FilterPodsTaints(pods Pods) Nodes {
return p.Filter(func(node *core.Node) bool {
for _, pod := range pods {
if utils.IsNodeSchedulableForPod(node, pod) {
return true
}
}
return false
})
}
func (p Nodes) FilterTaints(pod *core.Pod) Nodes {
return p.Filter(func(node *core.Node) bool {
return utils.IsNodeSchedulableForPod(node, pod)
})
}
func (p Nodes) FilterSchedulable() Nodes {
return p.Filter(func(node *core.Node) bool {
return !node.Spec.Unschedulable
})
}
func (p Nodes) PickAny() *core.Node {
if len(p) == 0 {
return nil
}
rand.Shuffle(len(p), func(i, j int) {
p[i], p[j] = p[j], p[i]
})
return p[0]
}
func ListNodes(ctx context.Context, in typedCore.NodeInterface) (Nodes, error) {
var nodes Nodes
cont := ""
for {
nextNodes, c, err := listNodes(ctx, in, cont)
if err != nil {
return nil, err
}
nodes = append(nodes, nextNodes...)
if c == "" {
return nodes, nil
}
cont = c
}
}
func listNodes(ctx context.Context, in typedCore.NodeInterface, next string) (Nodes, string, error) {
opts := meta.ListOptions{}
opts.Continue = next
nodes, err := in.List(ctx, opts)
if err != nil {
return nil, "", err
}
nodesPointers := make(Nodes, len(nodes.Items))
for id := range nodes.Items {
nodesPointers[id] = nodes.Items[id].DeepCopy()
}
return nodesPointers, nodes.Continue, nil
}

View file

@ -0,0 +1,116 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package resources
import (
"context"
"math/rand"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
typedCore "k8s.io/client-go/kubernetes/typed/core/v1"
)
type Pods []*core.Pod
func (p Pods) Filter(f func(pod *core.Pod) bool) Pods {
var r = make(Pods, 0, len(p))
for _, c := range p {
if f(c) {
r = append(r, c)
}
}
return r
}
func (p Pods) FilterByScheduled() Pods {
return p.Filter(func(pod *core.Pod) bool {
return pod.Status.NominatedNodeName != "" || pod.Spec.NodeName != ""
})
}
func (p Pods) FilterByPVCName(pvc string) Pods {
return p.Filter(func(pod *core.Pod) bool {
for _, v := range pod.Spec.Volumes {
if p := v.PersistentVolumeClaim; p != nil {
if p.ClaimName == pvc {
return true
}
}
}
return false
})
}
func (p Pods) PickAny() *core.Pod {
if len(p) == 0 {
return nil
}
rand.Shuffle(len(p), func(i, j int) {
p[i], p[j] = p[j], p[i]
})
return p[0]
}
func ListPods(ctx context.Context, in typedCore.PodInterface) (Pods, error) {
var pods Pods
cont := ""
for {
nextPods, c, err := listPods(ctx, in, cont)
if err != nil {
return nil, err
}
pods = append(pods, nextPods...)
if c == "" {
return pods, nil
}
cont = c
}
}
func listPods(ctx context.Context, in typedCore.PodInterface, next string) (Pods, string, error) {
opts := meta.ListOptions{}
opts.Continue = next
pods, err := in.List(ctx, opts)
if err != nil {
return nil, "", err
}
podsPointers := make(Pods, len(pods.Items))
for id := range pods.Items {
podsPointers[id] = pods.Items[id].DeepCopy()
}
return podsPointers, pods.Continue, nil
}

View file

@ -0,0 +1,87 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package utils
import (
"time"
core "k8s.io/api/core/v1"
)
func IsNodeSchedulableForPod(node *core.Node, pod *core.Pod) bool {
return AreTaintsTolerated(pod.Spec.Tolerations, node.Spec.Taints)
}
func AreTaintsTolerated(tolerations []core.Toleration, taints []core.Taint) bool {
for _, taint := range taints {
if !IsTaintTolerated(tolerations, taint) {
return false
}
}
return true
}
func IsTaintTolerated(tolerations []core.Toleration, taint core.Taint) bool {
for _, toleration := range tolerations {
if toleration.Effect != "" && toleration.Effect != taint.Effect {
// Not same effect
continue
}
if toleration.Key != "" && toleration.Key != taint.Key {
// Not same toleration key
continue
}
switch toleration.Operator {
case core.TolerationOpExists:
// We accept all values
case core.TolerationOpEqual:
if toleration.Value != taint.Value {
// If value does not match check next one
continue
}
}
if ts := toleration.TolerationSeconds; ts != nil {
if taint.Effect == core.TaintEffectNoExecute {
// NoExecute taint cant be tolerated for period of time
continue
}
if s := taint.TimeAdded; s != nil {
if start := s.Time; !start.IsZero() {
since := time.Since(start)
if since > time.Duration(*ts)*time.Second {
// We tolerate particular duration for short period of time
continue
}
}
}
}
return true
}
return false
}

View file

@ -0,0 +1,236 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package utils
import (
"testing"
"time"
"github.com/stretchr/testify/require"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/util"
)
type taintsCase struct {
tolerations []core.Toleration
taints []core.Taint
schedulable bool
}
func newMetaTimeWithDiff(d time.Duration) *meta.Time {
return newMetaTime(time.Now().Add(d))
}
func newMetaTime(time time.Time) *meta.Time {
q := meta.NewTime(time)
return &q
}
func Test_Taints(t *testing.T) {
cases := map[string]taintsCase{
"No taints & tolerations": {
schedulable: true,
},
"Tainted node": {
schedulable: false,
taints: []core.Taint{
{
Key: "node.kubernetes.io/unschedulable",
Effect: core.TaintEffectNoSchedule,
},
},
},
"Custom tainted node": {
schedulable: false,
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Effect: core.TaintEffectNoSchedule,
},
},
},
"Custom tainted node - tolerate all": {
schedulable: true,
tolerations: []core.Toleration{
{
Operator: core.TolerationOpExists,
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
},
},
},
"Custom tainted node - NoSched - tolerate all for 5 minutes - in range": {
schedulable: true,
tolerations: []core.Toleration{
{
Operator: core.TolerationOpExists,
TolerationSeconds: util.NewInt64(300),
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
TimeAdded: newMetaTimeWithDiff(0),
},
},
},
"Custom tainted node - NoSched - tolerate all for 5 minutes - out of range": {
schedulable: false,
tolerations: []core.Toleration{
{
Operator: core.TolerationOpExists,
TolerationSeconds: util.NewInt64(300),
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
TimeAdded: newMetaTimeWithDiff(-360 * time.Second),
},
},
},
"Custom tainted node - NoExec - tolerate all for 5 minute": {
schedulable: false,
tolerations: []core.Toleration{
{
Operator: core.TolerationOpExists,
TolerationSeconds: util.NewInt64(300),
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoExecute,
TimeAdded: newMetaTimeWithDiff(0),
},
},
},
"Custom tainted node - tolerate different": {
schedulable: false,
tolerations: []core.Toleration{
{
Key: "arangodb.com/taint2",
Operator: core.TolerationOpExists,
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
},
},
},
"Custom tainted node - tolerate key": {
schedulable: true,
tolerations: []core.Toleration{
{
Key: "arangodb.com/taint",
Operator: core.TolerationOpExists,
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
},
},
},
"Custom tainted node - tolerate key & diff value": {
schedulable: false,
tolerations: []core.Toleration{
{
Key: "arangodb.com/taint",
Value: "test2",
Operator: core.TolerationOpEqual,
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
},
},
},
"Custom tainted node - tolerate key & same value": {
schedulable: false,
tolerations: []core.Toleration{
{
Key: "arangodb.com/taint",
Value: "test2",
Operator: core.TolerationOpEqual,
},
},
taints: []core.Taint{
{
Key: "arangodb.com/taint",
Value: "test",
Effect: core.TaintEffectNoSchedule,
},
},
},
}
for n, c := range cases {
t.Run(n, func(t *testing.T) {
schedulable := AreTaintsTolerated(c.tolerations, c.taints)
if c.schedulable {
require.True(t, schedulable)
} else {
require.False(t, schedulable)
}
})
}
}