1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Failover Leader service (#1002)

This commit is contained in:
Tomasz Mielech 2022-06-10 15:53:50 +02:00 committed by GitHub
parent 4ff879f4f8
commit 4405567deb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 303 additions and 29 deletions

View file

@ -9,6 +9,7 @@
- (Feature) Add `ArangoBackupPolicy` CRD auto-installer
- (Feature) Add `ArangoJob` CRD auto-installer
- (Feature) Add RestartPolicyAlways to ArangoDeployment in order to restart ArangoDB on failure
- (Feature) Set a leader in active fail-over mode
## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
- (Bugfix) Fix arangosync members state inspection

View file

@ -98,6 +98,7 @@ Feature-wise production readiness table:
| Operator Internal Metrics Exporter | 1.2.3 | >= 3.7.0 | Community, Enterprise | Production | True | --deployment.feature.metrics-exporter | It is always enabled |
| Operator Ephemeral Volumes | 1.2.2 | >= 3.7.0 | Community, Enterprise | Alpha | False | --deployment.feature.ephemeral-volumes | N/A |
| Pod RestartPolicyAlways | 1.2.13 | >= 3.7.0 | Community, Enterprise | Alpha | False | --deployment.feature.restart-policy-always | N/A |
| Active fail-over leadership | 1.2.13 | >= 3.7.0 | Community, Enterprise | Production | False | --deployment.feature.failover-leadership | |
## Release notes for 0.3.16

View file

@ -129,7 +129,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
nextInterval = inspectNextInterval
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Reconcilation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Reconciliation failed", err, d.apiObject))
} else {
nextInterval = minInspectionInterval
}
@ -189,7 +189,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
}
if err := d.resources.EnsureLeader(ctx, d.GetCachedStatus()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Creating agency pod leader failed")
return minInspectionInterval, errors.Wrapf(err, "Creating leaders failed")
}
if err := d.resources.EnsureArangoMembers(ctx, d.GetCachedStatus()); err != nil {

View file

@ -0,0 +1,37 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package features
func init() {
registerFeature(failoverLeadership)
}
var failoverLeadership = &feature{
name: "failover-leadership",
description: "Support for leadership in fail-over mode",
version: "3.7.0",
enterpriseRequired: false,
enabledByDefault: false,
}
func FailoverLeadership() Feature {
return failoverLeadership
}

View file

@ -25,19 +25,19 @@ import (
"os"
"path/filepath"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
core "k8s.io/api/core/v1"
"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/jwt"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/pod"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/probes"
core "k8s.io/api/core/v1"
)
type Probe interface {
@ -400,9 +400,13 @@ func (r *Resources) probeBuilderReadinessCoreSelect() probeBuilder {
return r.probeBuilderReadinessCore
}
func (r *Resources) probeBuilderReadinessCoreOperator(spec api.DeploymentSpec, group api.ServerGroup, version driver.Version) (Probe, error) {
func (r *Resources) probeBuilderReadinessCoreOperator(spec api.DeploymentSpec, _ api.ServerGroup, _ driver.Version) (Probe, error) {
// /_admin/server/availability is the way to go, it is available since 3.3.9
args, err := r.probeCommand(spec, "/_admin/server/availability")
path := "/_admin/server/availability"
if features.FailoverLeadership().Enabled() && r.context.GetMode() == api.DeploymentModeActiveFailover {
path = "/_api/version"
}
args, err := r.probeCommand(spec, path)
if err != nil {
return nil, err
}
@ -414,9 +418,12 @@ func (r *Resources) probeBuilderReadinessCoreOperator(spec api.DeploymentSpec, g
}, nil
}
func (r *Resources) probeBuilderReadinessCore(spec api.DeploymentSpec, group api.ServerGroup, version driver.Version) (Probe, error) {
func (r *Resources) probeBuilderReadinessCore(spec api.DeploymentSpec, _ api.ServerGroup, _ driver.Version) (Probe, error) {
// /_admin/server/availability is the way to go, it is available since 3.3.9
localPath := "/_admin/server/availability"
if features.FailoverLeadership().Enabled() && r.context.GetMode() == api.DeploymentModeActiveFailover {
localPath = "/_api/version"
}
authorization := ""
if spec.IsAuthenticated() {

View file

@ -22,11 +22,17 @@ package resources
import (
"context"
"net/http"
"sync"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
@ -125,8 +131,8 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
if s, ok := cachedStatus.Service().V1().GetSimple(leaderAgentSvcName); ok {
if err, adjusted := r.adjustService(ctx, s, shared.ArangoPort, selector); err == nil {
if !adjusted {
// The service is not changed.
return nil
// The service is not changed, so single server leader can be set.
return r.ensureSingleServerLeader(ctx, cachedStatus)
}
return errors.Reconcile()
@ -149,3 +155,208 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
// The service has been created.
return errors.Reconcile()
}
// getSingleServerLeaderID returns id of a single server leader.
func (r *Resources) getSingleServerLeaderID(ctx context.Context) (string, error) {
status, _ := r.context.GetStatus()
var mutex sync.Mutex
var leaderID string
var anyError error
dbServers := func(group api.ServerGroup, list api.MemberStatusList) error {
if len(list) == 0 {
return nil
}
ctxCancel, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()
// Fetch availability of each single server.
var wg sync.WaitGroup
wg.Add(len(list))
for _, m := range list {
go func(id string) {
defer wg.Done()
err := globals.GetGlobalTimeouts().ArangoD().RunWithTimeout(ctxCancel, func(ctxChild context.Context) error {
c, err := r.context.GetServerClient(ctxChild, api.ServerGroupSingle, id)
if err != nil {
return err
}
if available, err := isServerAvailable(ctxChild, c); err != nil {
return err
} else if !available {
return errors.New("not available")
}
// Other requests can be interrupted, because a leader is known already.
cancel()
mutex.Lock()
leaderID = id
mutex.Unlock()
return nil
})
if err != nil {
mutex.Lock()
anyError = err
mutex.Unlock()
}
}(m.ID)
}
wg.Wait()
return nil
}
if err := status.Members.ForeachServerInGroups(dbServers, api.ServerGroupSingle); err != nil {
return "", err
}
if len(leaderID) > 0 {
return leaderID, nil
}
if anyError != nil {
return "", errors.WithMessagef(anyError, "unable to get a leader")
}
return "", errors.New("unable to get a leader")
}
// setSingleServerLeadership adds or removes leadership label on a single server pod.
func (r *Resources) ensureSingleServerLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
changed := false
enabled := features.FailoverLeadership().Enabled()
var leaderID string
if enabled {
var err error
if leaderID, err = r.getSingleServerLeaderID(ctx); err != nil {
return err
}
}
singleServers := func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName)
if !exist {
continue
}
labels := pod.GetLabels()
if enabled && m.ID == leaderID {
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; ok && value == "true" {
// Single server is available, and it has a leader label.
continue
}
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
// Single server is not available, and it does not have a leader label.
continue
}
delete(labels, k8sutil.LabelKeyArangoLeader)
}
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
return errors.WithMessagef(err, "unable to change leader label for pod %s", m.PodName)
}
changed = true
}
return nil
}
status, _ := r.context.GetStatus()
if err := status.Members.ForeachServerInGroups(singleServers, api.ServerGroupSingle); err != nil {
return err
}
if changed {
return errors.Reconcile()
}
return r.ensureSingleServerLeaderServices(ctx, cachedStatus)
}
// ensureSingleServerLeaderServices adds a leadership label to deployment service and external deployment service.
func (r *Resources) ensureSingleServerLeaderServices(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
// Add a leadership label to deployment service and external deployment service.
deploymentName := r.context.GetAPIObject().GetName()
changed := false
services := []string{
k8sutil.CreateDatabaseClientServiceName(deploymentName),
k8sutil.CreateDatabaseExternalAccessServiceName(deploymentName),
}
enabled := features.FailoverLeadership().Enabled()
for _, svcName := range services {
svc, exists := cachedStatus.Service().V1().GetSimple(svcName)
if !exists {
// It will be created later with a leadership label.
continue
}
selector := svc.Spec.Selector
if enabled {
if v, ok := selector[k8sutil.LabelKeyArangoLeader]; ok && v == "true" {
// It is already OK.
continue
}
selector = addLabel(selector, k8sutil.LabelKeyArangoLeader, "true")
} else {
if _, ok := selector[k8sutil.LabelKeyArangoLeader]; !ok {
// Service does not have a leader label, and it should not have.
continue
}
delete(selector, k8sutil.LabelKeyArangoLeader)
}
parser := patch.Patch([]patch.Item{patch.ItemReplace(patch.NewPath("spec", "selector"), selector)})
data, err := parser.Marshal()
if err != nil {
return errors.WithMessagef(err, "unable to marshal labels for service %s", svcName)
}
err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := cachedStatus.ServicesModInterface().V1().Patch(ctxChild, svcName, types.JSONPatchType, data, meta.PatchOptions{})
return err
})
if err != nil {
return errors.WithMessagef(err, "unable to patch labels for service %s", svcName)
}
changed = true
}
if changed {
return errors.Reconcile()
}
return nil
}
// isServerAvailable returns true when server is available.
// In active fail-over mode one of the server should be available.
func isServerAvailable(ctx context.Context, c driver.Client) (bool, error) {
req, err := c.Connection().NewRequest("GET", "_admin/server/availability")
if err != nil {
return false, errors.WithStack(err)
}
resp, err := c.Connection().Do(ctx, req)
if err != nil {
return false, errors.WithStack(err)
}
if err := resp.CheckStatus(http.StatusOK, http.StatusServiceUnavailable); err != nil {
return false, errors.WithStack(err)
}
return resp.StatusCode() == http.StatusOK, nil
}

View file

@ -25,6 +25,7 @@ import (
"strings"
"time"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"k8s.io/apimachinery/pkg/api/equality"
@ -200,12 +201,17 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
}
// Internal database client service
single := spec.GetMode().HasSingleServers()
var single, withLeader bool
if single = spec.GetMode().HasSingleServers(); single {
if spec.GetMode() == api.DeploymentModeActiveFailover && features.FailoverLeadership().Enabled() {
withLeader = true
}
}
counterMetric.Inc()
if _, exists := cachedStatus.Service().V1().GetSimple(k8sutil.CreateDatabaseClientServiceName(deploymentName)); !exists {
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
svcName, newlyCreated, err := k8sutil.CreateDatabaseClientService(ctxChild, svcs, apiObject, single, owner)
svcName, newlyCreated, err := k8sutil.CreateDatabaseClientService(ctxChild, svcs, apiObject, single, withLeader, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client service")
return errors.WithStack(err)
@ -230,7 +236,8 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
if single {
role = "single"
}
if err := r.ensureExternalAccessServices(ctx, cachedStatus, svcs, eaServiceName, role, "database", shared.ArangoPort, false, spec.ExternalAccess, apiObject, log); err != nil {
if err := r.ensureExternalAccessServices(ctx, cachedStatus, svcs, eaServiceName, role, "database",
shared.ArangoPort, false, withLeader, spec.ExternalAccess, apiObject, log); err != nil {
return errors.WithStack(err)
}
@ -239,7 +246,8 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
counterMetric.Inc()
eaServiceName := k8sutil.CreateSyncMasterClientServiceName(deploymentName)
role := "syncmaster"
if err := r.ensureExternalAccessServices(ctx, cachedStatus, svcs, eaServiceName, role, "sync", shared.ArangoSyncMasterPort, true, spec.Sync.ExternalAccess.ExternalAccessSpec, apiObject, log); err != nil {
if err := r.ensureExternalAccessServices(ctx, cachedStatus, svcs, eaServiceName, role, "sync",
shared.ArangoSyncMasterPort, true, false, spec.Sync.ExternalAccess.ExternalAccessSpec, apiObject, log); err != nil {
return errors.WithStack(err)
}
status, lastVersion := r.context.GetStatus()
@ -273,7 +281,7 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
// EnsureServices creates all services needed to service the deployment
func (r *Resources) ensureExternalAccessServices(ctx context.Context, cachedStatus inspectorInterface.Inspector,
svcs servicev1.ModInterface, eaServiceName, svcRole, title string, port int, noneIsClusterIP bool,
svcs servicev1.ModInterface, eaServiceName, svcRole, title string, port int, noneIsClusterIP bool, withLeader bool,
spec api.ExternalAccessSpec, apiObject k8sutil.APIObject, log zerolog.Logger) error {
// Database external access service
createExternalAccessService := false
@ -363,7 +371,8 @@ func (r *Resources) ensureExternalAccessServices(ctx context.Context, cachedStat
loadBalancerSourceRanges := spec.LoadBalancerSourceRanges
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
_, newlyCreated, err := k8sutil.CreateExternalAccessService(ctxChild, svcs, eaServiceName, svcRole, apiObject, eaServiceType, port, nodePort, loadBalancerIP, loadBalancerSourceRanges, apiObject.AsOwner())
_, newlyCreated, err := k8sutil.CreateExternalAccessService(ctxChild, svcs, eaServiceName, svcRole, apiObject,
eaServiceType, port, nodePort, loadBalancerIP, loadBalancerSourceRanges, apiObject.AsOwner(), withLeader)
if err != nil {
log.Debug().Err(err).Msgf("Failed to create %s external access service", title)
return errors.WithStack(err)

View file

@ -37,11 +37,12 @@ import (
)
var (
Cause = errs.Cause
New = errs.New
WithStack = errs.WithStack
Wrap = errs.Wrap
Wrapf = errs.Wrapf
Cause = errs.Cause
New = errs.New
WithStack = errs.WithStack
Wrap = errs.Wrap
Wrapf = errs.Wrapf
WithMessagef = errs.WithMessagef
)
func Newf(format string, args ...interface{}) error {

View file

@ -127,7 +127,8 @@ func CreateHeadlessService(ctx context.Context, svcs servicev1.ModInterface, dep
}
publishNotReadyAddresses := true
serviceType := core.ServiceTypeClusterIP
newlyCreated, err := createService(ctx, svcs, svcName, deploymentName, shared.ClusterIPNone, "", serviceType, ports, "", nil, publishNotReadyAddresses, owner)
newlyCreated, err := createService(ctx, svcs, svcName, deploymentName, shared.ClusterIPNone, "", serviceType, ports,
"", nil, publishNotReadyAddresses, false, owner)
if err != nil {
return "", false, errors.WithStack(err)
}
@ -138,8 +139,8 @@ func CreateHeadlessService(ctx context.Context, svcs servicev1.ModInterface, dep
// If the service already exists, nil is returned.
// If another error occurs, that error is returned.
// The returned bool is true if the service is created, or false when the service already existed.
func CreateDatabaseClientService(ctx context.Context, svcs servicev1.ModInterface, deployment metav1.Object, single bool,
owner metav1.OwnerReference) (string, bool, error) {
func CreateDatabaseClientService(ctx context.Context, svcs servicev1.ModInterface, deployment metav1.Object,
single, withLeader bool, owner metav1.OwnerReference) (string, bool, error) {
deploymentName := deployment.GetName()
svcName := CreateDatabaseClientServiceName(deploymentName)
ports := []core.ServicePort{
@ -157,7 +158,8 @@ func CreateDatabaseClientService(ctx context.Context, svcs servicev1.ModInterfac
}
serviceType := core.ServiceTypeClusterIP
publishNotReadyAddresses := false
newlyCreated, err := createService(ctx, svcs, svcName, deploymentName, "", role, serviceType, ports, "", nil, publishNotReadyAddresses, owner)
newlyCreated, err := createService(ctx, svcs, svcName, deploymentName, "", role, serviceType, ports, "", nil,
publishNotReadyAddresses, withLeader, owner)
if err != nil {
return "", false, errors.WithStack(err)
}
@ -170,7 +172,7 @@ func CreateDatabaseClientService(ctx context.Context, svcs servicev1.ModInterfac
// The returned bool is true if the service is created, or false when the service already existed.
func CreateExternalAccessService(ctx context.Context, svcs servicev1.ModInterface, svcName, role string,
deployment metav1.Object, serviceType core.ServiceType, port, nodePort int, loadBalancerIP string,
loadBalancerSourceRanges []string, owner metav1.OwnerReference) (string, bool, error) {
loadBalancerSourceRanges []string, owner metav1.OwnerReference, withLeader bool) (string, bool, error) {
deploymentName := deployment.GetName()
ports := []core.ServicePort{
{
@ -181,7 +183,8 @@ func CreateExternalAccessService(ctx context.Context, svcs servicev1.ModInterfac
},
}
publishNotReadyAddresses := false
newlyCreated, err := createService(ctx, svcs, svcName, deploymentName, "", role, serviceType, ports, loadBalancerIP, loadBalancerSourceRanges, publishNotReadyAddresses, owner)
newlyCreated, err := createService(ctx, svcs, svcName, deploymentName, "", role, serviceType, ports, loadBalancerIP,
loadBalancerSourceRanges, publishNotReadyAddresses, withLeader, owner)
if err != nil {
return "", false, errors.WithStack(err)
}
@ -194,8 +197,12 @@ func CreateExternalAccessService(ctx context.Context, svcs servicev1.ModInterfac
// The returned bool is true if the service is created, or false when the service already existed.
func createService(ctx context.Context, svcs servicev1.ModInterface, svcName, deploymentName, clusterIP, role string,
serviceType core.ServiceType, ports []core.ServicePort, loadBalancerIP string, loadBalancerSourceRanges []string,
publishNotReadyAddresses bool, owner metav1.OwnerReference) (bool, error) {
publishNotReadyAddresses, withLeader bool, owner metav1.OwnerReference) (bool, error) {
labels := LabelsForDeployment(deploymentName, role)
if withLeader {
labels[LabelKeyArangoLeader] = "true"
}
svc := &core.Service{
ObjectMeta: metav1.ObjectMeta{
Name: svcName,