mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Merged master
This commit is contained in:
commit
df09665069
22 changed files with 326 additions and 24 deletions
|
@ -39,18 +39,18 @@ If you want to create external access services manually, follow the instructions
|
|||
### Single server
|
||||
|
||||
For a single server deployment, the operator creates a single
|
||||
`Service` named `<cluster-name>`. This service has a normal cluster IP
|
||||
`Service` named `<deployment-name>`. This service has a normal cluster IP
|
||||
address.
|
||||
|
||||
### Full cluster
|
||||
|
||||
For a full cluster deployment, the operator creates two `Services`.
|
||||
|
||||
- `<cluster-name>_servers` a headless `Service` intended to provide
|
||||
- `<deployment-name>-int` a headless `Service` intended to provide
|
||||
DNS names for all pods created by the operator.
|
||||
It selects all ArangoDB & ArangoSync servers in the cluster.
|
||||
|
||||
- `<cluster-name>` a normal `Service` that selects only the coordinators
|
||||
- `<deployment-name>` a normal `Service` that selects only the coordinators
|
||||
of the cluster. This `Service` is configured with `ClientIP` session
|
||||
affinity. This is needed for cursor requests, since they are bound to
|
||||
a specific coordinator.
|
||||
|
@ -58,7 +58,7 @@ For a full cluster deployment, the operator creates two `Services`.
|
|||
When the coordinators are asked to provide endpoints of the cluster
|
||||
(e.g. when calling `client.SynchronizeEndpoints()` in the go driver)
|
||||
the DNS names of the individual `Pods` will be returned
|
||||
(`<pod>.<cluster-name>_servers.<namespace>.svc`)
|
||||
(`<pod>.<deployment-name>-int.<namespace>.svc`)
|
||||
|
||||
### Full cluster with DC2DC
|
||||
|
||||
|
@ -66,23 +66,26 @@ For a full cluster with datacenter replication deployment,
|
|||
the same `Services` are created as for a Full cluster, with the following
|
||||
additions:
|
||||
|
||||
- `<cluster-name>_sync` a normal `Service` that selects only the syncmasters
|
||||
- `<deployment-name>-sync` a normal `Service` that selects only the syncmasters
|
||||
of the cluster.
|
||||
|
||||
## Load balancer
|
||||
|
||||
To reach the ArangoDB servers from outside the Kubernetes cluster, you
|
||||
have to deploy additional services.
|
||||
If you want full control of the `Services` needed to access the ArangoDB deployment
|
||||
from outside your Kubernetes cluster, set `spec.externalAccess.Type` of the `ArangoDeployment` to `None`
|
||||
and create a `Service` as specified below.
|
||||
|
||||
You can use `LoadBalancer` or `NodePort` services, depending on your
|
||||
Create a `Service` of type `LoadBalancer` or `NodePort`, depending on your
|
||||
Kubernetes deployment.
|
||||
|
||||
This service should select:
|
||||
|
||||
- `arangodb_cluster_name: <cluster-name>`
|
||||
- `arango_deployment: <deployment-name>`
|
||||
- `role: coordinator`
|
||||
|
||||
For example:
|
||||
The following example yields a service of type `LoadBalancer` with a specific
|
||||
load balancer IP address.
|
||||
With this service, the ArangoDB cluster can now be reached on `https://1.2.3.4:8529`.
|
||||
|
||||
```yaml
|
||||
kind: Service
|
||||
|
@ -91,7 +94,27 @@ metadata:
|
|||
name: arangodb-cluster-exposed
|
||||
spec:
|
||||
selector:
|
||||
arangodb_cluster_name: arangodb-cluster
|
||||
arango_deployment: arangodb-cluster
|
||||
role: coordinator
|
||||
type: LoadBalancer
|
||||
loadBalancerIP: 1.2.3.4
|
||||
ports:
|
||||
- protocol: TCP
|
||||
port: 8529
|
||||
targetPort: 8529
|
||||
```
|
||||
|
||||
The following example yields a service of type `NodePort` with the ArangoDB
|
||||
cluster exposed on port 30529 of all nodes of the Kubernetes cluster.
|
||||
|
||||
```yaml
|
||||
kind: Service
|
||||
apiVersion: v1
|
||||
metadata:
|
||||
name: arangodb-cluster-exposed
|
||||
spec:
|
||||
selector:
|
||||
arango_deployment: arangodb-cluster
|
||||
role: coordinator
|
||||
type: NodePort
|
||||
ports:
|
||||
|
|
8
examples/production-cluster.yaml
Normal file
8
examples/production-cluster.yaml
Normal file
|
@ -0,0 +1,8 @@
|
|||
apiVersion: "database.arangodb.com/v1alpha"
|
||||
kind: "ArangoDeployment"
|
||||
metadata:
|
||||
name: "production-cluster"
|
||||
spec:
|
||||
mode: Cluster
|
||||
image: arangodb/arangodb:3.3.10
|
||||
environment: Production
|
|
@ -47,6 +47,11 @@ func (e Environment) Validate() error {
|
|||
}
|
||||
}
|
||||
|
||||
// IsProduction returns true when the given environment is a production environment.
|
||||
func (e Environment) IsProduction() bool {
|
||||
return e == EnvironmentProduction
|
||||
}
|
||||
|
||||
// NewEnvironment returns a reference to a string with given value.
|
||||
func NewEnvironment(input Environment) *Environment {
|
||||
return &input
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Action executes a single Plan item.
|
||||
|
@ -35,4 +36,6 @@ type Action interface {
|
|||
// CheckProgress checks the progress of the action.
|
||||
// Returns true if the action is completely finished, false otherwise.
|
||||
CheckProgress(ctx context.Context) (bool, error)
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
Timeout() time.Duration
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -64,3 +65,8 @@ func (a *actionAddMember) CheckProgress(ctx context.Context) (bool, error) {
|
|||
// Nothing todo
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionAddMember) Timeout() time.Duration {
|
||||
return addMemberTimeout
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -114,3 +115,8 @@ func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, error)
|
|||
// Cleanout completed
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionCleanoutMember) Timeout() time.Duration {
|
||||
return cleanoutMemberTimeout
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -94,3 +95,8 @@ func (a *actionRemoveMember) CheckProgress(ctx context.Context) (bool, error) {
|
|||
// Nothing todo
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionRemoveMember) Timeout() time.Duration {
|
||||
return removeMemberTimeout
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -69,3 +70,8 @@ func (a *renewTLSCertificateAction) Start(ctx context.Context) (bool, error) {
|
|||
func (a *renewTLSCertificateAction) CheckProgress(ctx context.Context) (bool, error) {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *renewTLSCertificateAction) Timeout() time.Duration {
|
||||
return renewTLSCertificateTimeout
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -116,3 +117,8 @@ func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, error) {
|
|||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionRotateMember) Timeout() time.Duration {
|
||||
return rotateMemberTimeout
|
||||
}
|
||||
|
|
|
@ -111,3 +111,8 @@ func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, error)
|
|||
// Member still not shutdown, retry soon
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionShutdownMember) Timeout() time.Duration {
|
||||
return shutdownMemberTimeout
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -126,3 +127,8 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) {
|
|||
}
|
||||
return isUpgrading, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionUpgradeMember) Timeout() time.Duration {
|
||||
return upgradeMemberTimeout
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package reconcile
|
|||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
driver "github.com/arangodb/go-driver"
|
||||
"github.com/arangodb/go-driver/agency"
|
||||
|
@ -164,3 +165,8 @@ func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bo
|
|||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Timeout returns the amount of time after which this action will timeout.
|
||||
func (a *actionWaitForMemberUp) Timeout() time.Duration {
|
||||
return waitForMemberUpTimeout
|
||||
}
|
||||
|
|
|
@ -54,6 +54,9 @@ type Context interface {
|
|||
GetAgencyClients(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error)
|
||||
// GetSyncServerClient returns a cached client for a specific arangosync server.
|
||||
GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error)
|
||||
// CreateEvent creates a given event.
|
||||
// On error, the error is logged.
|
||||
CreateEvent(evt *v1.Event)
|
||||
// CreateMember adds a new member to the given group.
|
||||
// If ID is non-empty, it will be used, otherwise a new ID is created.
|
||||
CreateMember(group api.ServerGroup, id string) error
|
||||
|
|
|
@ -25,11 +25,13 @@ package reconcile
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
// ExecutePlan tries to execute the plan as far as possible.
|
||||
|
@ -106,7 +108,21 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) {
|
|||
}
|
||||
log.Debug().Bool("ready", ready).Msg("Action CheckProgress completed")
|
||||
if !ready {
|
||||
// Not ready check, come back soon
|
||||
// Not ready yet, check timeout
|
||||
deadline := planAction.CreationTime.Add(action.Timeout())
|
||||
if time.Now().After(deadline) {
|
||||
// Timeout has expired
|
||||
log.Warn().Msg("Action not finished in time. Removing the entire plan")
|
||||
d.context.CreateEvent(k8sutil.NewPlanTimeoutEvent(d.context.GetAPIObject(), string(planAction.Type), planAction.MemberID, planAction.Group.AsRole()))
|
||||
// Replace plan with empty one and save it.
|
||||
status.Plan = api.Plan{}
|
||||
if err := d.context.UpdateStatus(status); err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to update CR status")
|
||||
return false, maskAny(err)
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
// Timeout not yet expired, come back soon
|
||||
return true, nil
|
||||
}
|
||||
// Continue with next action
|
||||
|
|
36
pkg/deployment/reconcile/timeouts.go
Normal file
36
pkg/deployment/reconcile/timeouts.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package reconcile
|
||||
|
||||
import "time"
|
||||
|
||||
const (
|
||||
addMemberTimeout = time.Minute * 5
|
||||
cleanoutMemberTimeout = time.Hour * 12
|
||||
removeMemberTimeout = time.Minute * 15
|
||||
renewTLSCertificateTimeout = time.Minute * 30
|
||||
rotateMemberTimeout = time.Minute * 30
|
||||
shutdownMemberTimeout = time.Minute * 30
|
||||
upgradeMemberTimeout = time.Hour * 6
|
||||
waitForMemberUpTimeout = time.Minute * 15
|
||||
)
|
|
@ -62,7 +62,7 @@ type Context interface {
|
|||
GetLifecycleImage() string
|
||||
// GetNamespace returns the namespace that contains the deployment
|
||||
GetNamespace() string
|
||||
// createEvent creates a given event.
|
||||
// CreateEvent creates a given event.
|
||||
// On error, the error is logged.
|
||||
CreateEvent(evt *v1.Event)
|
||||
// GetOwnedPods returns a list of all pods owned by the deployment.
|
||||
|
|
|
@ -43,6 +43,7 @@ func (r *Resources) EnsurePVCs() error {
|
|||
owner := apiObject.AsOwner()
|
||||
iterator := r.context.GetServerGroupIterator()
|
||||
status := r.context.GetStatus()
|
||||
enforceAntiAffinity := r.context.GetSpec().GetEnvironment().IsProduction()
|
||||
|
||||
if err := iterator.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error {
|
||||
for _, m := range *status {
|
||||
|
@ -51,7 +52,7 @@ func (r *Resources) EnsurePVCs() error {
|
|||
role := group.AsRole()
|
||||
resources := spec.Resources
|
||||
finalizers := r.createPVCFinalizers(group)
|
||||
if err := k8sutil.CreatePersistentVolumeClaim(kubecli, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, resources, finalizers, owner); err != nil {
|
||||
if err := k8sutil.CreatePersistentVolumeClaim(kubecli, m.PersistentVolumeClaimName, deploymentName, ns, storageClassName, role, enforceAntiAffinity, resources, finalizers, owner); err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,7 @@ package storage
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/sha1"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
|
@ -32,6 +33,7 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
|
||||
|
@ -41,6 +43,8 @@ import (
|
|||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
|
||||
"github.com/arangodb/kube-arangodb/pkg/storage/provisioner"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/constants"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -72,7 +76,24 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
|
|||
clients[i], clients[j] = clients[j], clients[i]
|
||||
})
|
||||
|
||||
var nodeClientMap map[string]provisioner.API
|
||||
for i, claim := range unboundClaims {
|
||||
// Find deployment name & role in the claim (if any)
|
||||
deplName, role, enforceAniAffinity := getDeploymentInfo(claim)
|
||||
allowedClients := clients
|
||||
if enforceAniAffinity && deplName != "" {
|
||||
// Select nodes to choose from such that no volume in group lands on the same node
|
||||
if nodeClientMap == nil {
|
||||
nodeClientMap = createNodeClientMap(ctx, clients)
|
||||
}
|
||||
var err error
|
||||
allowedClients, err = ls.filterAllowedNodes(nodeClientMap, deplName, role)
|
||||
if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to filter allowed nodes")
|
||||
continue // We'll try this claim again later
|
||||
}
|
||||
}
|
||||
|
||||
// Find size of PVC
|
||||
volSize := defaultVolumeSize
|
||||
if reqStorage := claim.Spec.Resources.Requests.StorageEphemeral(); reqStorage != nil {
|
||||
|
@ -81,7 +102,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
|
|||
}
|
||||
}
|
||||
// Create PV
|
||||
if err := ls.createPV(ctx, apiObject, clients, i, volSize); err != nil {
|
||||
if err := ls.createPV(ctx, apiObject, allowedClients, i, volSize, claim, deplName, role); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to create PersistentVolume")
|
||||
}
|
||||
}
|
||||
|
@ -90,7 +111,7 @@ func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLoca
|
|||
}
|
||||
|
||||
// createPV creates a PersistentVolume.
|
||||
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64) error {
|
||||
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64, claim v1.PersistentVolumeClaim, deploymentName, role string) error {
|
||||
log := ls.deps.Log
|
||||
// Try clients
|
||||
for clientIdx := 0; clientIdx < len(clients); clientIdx++ {
|
||||
|
@ -117,7 +138,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
|
|||
continue
|
||||
}
|
||||
// Create a volume
|
||||
pvName := apiObject.GetName() + "-" + name
|
||||
pvName := strings.ToLower(apiObject.GetName() + "-" + shortHash(info.NodeName) + "-" + name)
|
||||
volumeMode := v1.PersistentVolumeFilesystem
|
||||
nodeAff, err := createNodeAffinity(info.NodeName)
|
||||
if err != nil {
|
||||
|
@ -131,6 +152,10 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
|
|||
v1.AlphaStorageNodeAffinityAnnotation: nodeAff,
|
||||
nodeNameAnnotation: info.NodeName,
|
||||
},
|
||||
Labels: map[string]string{
|
||||
k8sutil.LabelKeyArangoDeployment: deploymentName,
|
||||
k8sutil.LabelKeyRole: role,
|
||||
},
|
||||
},
|
||||
Spec: v1.PersistentVolumeSpec{
|
||||
Capacity: v1.ResourceList{
|
||||
|
@ -147,6 +172,13 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
|
|||
},
|
||||
StorageClassName: apiObject.Spec.StorageClass.Name,
|
||||
VolumeMode: &volumeMode,
|
||||
ClaimRef: &v1.ObjectReference{
|
||||
Kind: "PersistentVolumeClaim",
|
||||
APIVersion: "",
|
||||
Name: claim.GetName(),
|
||||
Namespace: claim.GetNamespace(),
|
||||
UID: claim.GetUID(),
|
||||
},
|
||||
},
|
||||
}
|
||||
// Attach PV to ArangoLocalStorage
|
||||
|
@ -159,6 +191,16 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
|
|||
Str("name", pvName).
|
||||
Str("node-name", info.NodeName).
|
||||
Msg("Created PersistentVolume")
|
||||
|
||||
// Bind claim to volume
|
||||
if err := ls.bindClaimToVolume(claim, pv.GetName()); err != nil {
|
||||
// Try to delete the PV now
|
||||
if err := ls.deps.KubeCli.CoreV1().PersistentVolumes().Delete(pv.GetName(), &metav1.DeleteOptions{}); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to delete PV after binding PVC failed")
|
||||
}
|
||||
return maskAny(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
@ -204,3 +246,96 @@ func createNodeAffinity(nodeName string) (string, error) {
|
|||
}
|
||||
return string(encoded), nil
|
||||
}
|
||||
|
||||
// createNodeClientMap creates a map from node name to API.
|
||||
// Clients that do not respond properly on a GetNodeInfo request are
|
||||
// ignored.
|
||||
func createNodeClientMap(ctx context.Context, clients []provisioner.API) map[string]provisioner.API {
|
||||
result := make(map[string]provisioner.API)
|
||||
for _, c := range clients {
|
||||
if info, err := c.GetNodeInfo(ctx); err == nil {
|
||||
result[info.NodeName] = c
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// getDeploymentInfo returns the name of the deployment that created the given claim,
|
||||
// the role of the server that the claim is used for and the value for `enforceAntiAffinity`.
|
||||
// If not found, empty strings are returned.
|
||||
// Returns deploymentName, role, enforceAntiAffinity.
|
||||
func getDeploymentInfo(pvc v1.PersistentVolumeClaim) (string, string, bool) {
|
||||
deploymentName := pvc.GetLabels()[k8sutil.LabelKeyArangoDeployment]
|
||||
role := pvc.GetLabels()[k8sutil.LabelKeyRole]
|
||||
enforceAntiAffinity, _ := strconv.ParseBool(pvc.GetAnnotations()[constants.AnnotationEnforceAntiAffinity]) // If annotation empty, this will yield false.
|
||||
return deploymentName, role, enforceAntiAffinity
|
||||
}
|
||||
|
||||
// filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role.
|
||||
func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, deploymentName, role string) ([]provisioner.API, error) {
|
||||
// Find all PVs for given deployment & role
|
||||
list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{
|
||||
LabelSelector: fmt.Sprintf("%s=%s,%s=%s", k8sutil.LabelKeyArangoDeployment, deploymentName, k8sutil.LabelKeyRole, role),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
excludedNodes := make(map[string]struct{})
|
||||
for _, pv := range list.Items {
|
||||
nodeName := pv.GetAnnotations()[nodeNameAnnotation]
|
||||
excludedNodes[nodeName] = struct{}{}
|
||||
}
|
||||
result := make([]provisioner.API, 0, len(clients))
|
||||
for nodeName, c := range clients {
|
||||
if _, found := excludedNodes[nodeName]; !found {
|
||||
result = append(result, c)
|
||||
}
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// bindClaimToVolume tries to bind the given claim to the volume with given name.
|
||||
// If the claim has been updated, the function retries several times.
|
||||
func (ls *LocalStorage) bindClaimToVolume(claim v1.PersistentVolumeClaim, volumeName string) error {
|
||||
log := ls.deps.Log.With().Str("pvc-name", claim.GetName()).Str("volume-name", volumeName).Logger()
|
||||
pvcs := ls.deps.KubeCli.CoreV1().PersistentVolumeClaims(claim.GetNamespace())
|
||||
|
||||
for attempt := 0; attempt < 10; attempt++ {
|
||||
// Backoff if needed
|
||||
time.Sleep(time.Millisecond * time.Duration(10*attempt))
|
||||
|
||||
// Fetch latest version of claim
|
||||
updated, err := pvcs.Get(claim.GetName(), metav1.GetOptions{})
|
||||
if k8sutil.IsNotFound(err) {
|
||||
return maskAny(err)
|
||||
} else if err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to load updated PersistentVolumeClaim")
|
||||
continue
|
||||
}
|
||||
|
||||
// Check claim. If already bound, bail out
|
||||
if !pvcNeedsVolume(*updated) {
|
||||
return maskAny(fmt.Errorf("PersistentVolumeClaim '%s' no longer needs a volume", claim.GetName()))
|
||||
}
|
||||
|
||||
// Try to bind
|
||||
updated.Spec.VolumeName = volumeName
|
||||
if _, err := pvcs.Update(updated); k8sutil.IsConflict(err) {
|
||||
// Claim modified already, retry
|
||||
log.Debug().Err(err).Msg("PersistentVolumeClaim has been modified. Retrying.")
|
||||
} else if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to bind PVC to volume")
|
||||
return maskAny(err)
|
||||
}
|
||||
log.Debug().Msg("Bound volume to PersistentVolumeClaim")
|
||||
return nil
|
||||
}
|
||||
log.Error().Msg("All attempts to bind PVC to volume failed")
|
||||
return maskAny(fmt.Errorf("All attempts to bind PVC to volume failed"))
|
||||
}
|
||||
|
||||
// shortHash creates a 6 letter hash of the given name.
|
||||
func shortHash(name string) string {
|
||||
h := sha1.Sum([]byte(name))
|
||||
return fmt.Sprintf("%0x", h)[:6]
|
||||
}
|
||||
|
|
|
@ -44,9 +44,11 @@ const (
|
|||
|
||||
SecretAccessPackageYaml = "accessPackage.yaml" // Key in Secret.data used to store a YAML encoded access package
|
||||
|
||||
FinalizerPodDrainDBServer = "dbserver.database.arangodb.com/drain" // Finalizer added to DBServers, indicating the need for draining that dbserver
|
||||
FinalizerPodAgencyServing = "agent.database.arangodb.com/agency-serving" // Finalizer added to Agents, indicating the need for keeping enough agents alive
|
||||
FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists
|
||||
FinalizerDeplRemoveChildFinalizers = "database.arangodb.com/remove-child-finalizers" // Finalizer added to ArangoDeployment, indicating the need to remove finalizers from all children
|
||||
FinalizerDeplReplStopSync = "replication.database.arangodb.com/stop-sync" // Finalizer added to ArangoDeploymentReplication, indicating the need to stop synchronization
|
||||
FinalizerPodAgencyServing = "agent.database.arangodb.com/agency-serving" // Finalizer added to Agents, indicating the need for keeping enough agents alive
|
||||
FinalizerPodDrainDBServer = "dbserver.database.arangodb.com/drain" // Finalizer added to DBServers, indicating the need for draining that dbserver
|
||||
FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists
|
||||
|
||||
AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false"
|
||||
)
|
||||
|
|
|
@ -145,6 +145,16 @@ func NewAccessPackageDeletedEvent(apiObject APIObject, apSecretName string) *v1.
|
|||
return event
|
||||
}
|
||||
|
||||
// NewPlanTimeoutEvent creates an event indicating that an item on a reconciliation plan did not
|
||||
// finish before its deadline.
|
||||
func NewPlanTimeoutEvent(apiObject APIObject, itemType, memberID, role string) *v1.Event {
|
||||
event := newDeploymentEvent(apiObject)
|
||||
event.Type = v1.EventTypeNormal
|
||||
event.Reason = "Reconciliation Plan Timeout"
|
||||
event.Message = fmt.Sprintf("An plan item of type %s or member %s with role %s did not finish in time", itemType, memberID, role)
|
||||
return event
|
||||
}
|
||||
|
||||
// NewErrorEvent creates an even of type error.
|
||||
func NewErrorEvent(reason string, err error, apiObject APIObject) *v1.Event {
|
||||
event := newDeploymentEvent(apiObject)
|
||||
|
|
|
@ -23,9 +23,13 @@
|
|||
package k8sutil
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/constants"
|
||||
)
|
||||
|
||||
// IsPersistentVolumeClaimMarkedForDeletion returns true if the pod has been marked for deletion.
|
||||
|
@ -42,7 +46,7 @@ func CreatePersistentVolumeClaimName(deploymentName, role, id string) string {
|
|||
// CreatePersistentVolumeClaim creates a persistent volume claim with given name and configuration.
|
||||
// If the pvc already exists, nil is returned.
|
||||
// If another error occurs, that error is returned.
|
||||
func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deploymentName, ns, storageClassName, role string, resources v1.ResourceRequirements, finalizers []string, owner metav1.OwnerReference) error {
|
||||
func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deploymentName, ns, storageClassName, role string, enforceAntiAffinity bool, resources v1.ResourceRequirements, finalizers []string, owner metav1.OwnerReference) error {
|
||||
labels := LabelsForDeployment(deploymentName, role)
|
||||
volumeMode := v1.PersistentVolumeFilesystem
|
||||
pvc := &v1.PersistentVolumeClaim{
|
||||
|
@ -50,6 +54,9 @@ func CreatePersistentVolumeClaim(kubecli kubernetes.Interface, pvcName, deployme
|
|||
Name: pvcName,
|
||||
Labels: labels,
|
||||
Finalizers: finalizers,
|
||||
Annotations: map[string]string{
|
||||
constants.AnnotationEnforceAntiAffinity: strconv.FormatBool(enforceAntiAffinity),
|
||||
},
|
||||
},
|
||||
Spec: v1.PersistentVolumeClaimSpec{
|
||||
AccessModes: []v1.PersistentVolumeAccessMode{
|
||||
|
|
|
@ -29,6 +29,7 @@ import (
|
|||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -54,8 +55,9 @@ const (
|
|||
)
|
||||
|
||||
var (
|
||||
maskAny = errors.WithStack
|
||||
syncClientCache client.ClientCache
|
||||
maskAny = errors.WithStack
|
||||
syncClientCache client.ClientCache
|
||||
showEnterpriseImageOnce sync.Once
|
||||
)
|
||||
|
||||
// longOrSkip checks the short test flag.
|
||||
|
@ -73,6 +75,10 @@ func getEnterpriseImageOrSkip(t *testing.T) string {
|
|||
image := strings.TrimSpace(os.Getenv("ENTERPRISEIMAGE"))
|
||||
if image == "" {
|
||||
t.Skip("Skipping test because ENTERPRISEIMAGE is not set")
|
||||
} else {
|
||||
showEnterpriseImageOnce.Do(func() {
|
||||
t.Logf("Using enterprise image: %s", image)
|
||||
})
|
||||
}
|
||||
return image
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue