1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Add agency leader service (#991)

This commit is contained in:
Tomasz Mielech 2022-05-30 09:32:33 +02:00 committed by GitHub
parent b4d44a9f47
commit 81102932a4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 296 additions and 89 deletions

View file

@ -10,6 +10,7 @@
- (Feature) Add operator shutdown handler for graceful termination
- (Feature) Add agency leader discovery
- (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method
- (Feature) Add agency leader service
## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10)
- (Feature) Add CoreV1 Endpoints Inspector

View file

@ -31,7 +31,15 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
type health map[string]uint64
type health struct {
leaderID string
commitIndexes map[string]uint64
}
func (h health) LeaderID() string {
return h.leaderID
}
// IsHealthy returns true if all agencies have the same commit index.
// Returns false when:
@ -42,7 +50,7 @@ func (h health) IsHealthy() bool {
var globalCommitIndex uint64
first := true
for _, commitIndex := range h {
for _, commitIndex := range h.commitIndexes {
if first {
globalCommitIndex = commitIndex
first = false
@ -58,14 +66,15 @@ func (h health) IsHealthy() bool {
type Health interface {
// IsHealthy return true when environment is considered as healthy.
IsHealthy() bool
// LeaderID returns a leader ID or empty string if a leader is not known.
LeaderID() string
}
type Cache interface {
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
Data() (State, bool)
CommitIndex() uint64
// GetLeaderID returns a leader ID.
GetLeaderID() string
// Health returns true when healthy object is available.
Health() (Health, bool)
}
@ -93,11 +102,6 @@ func (c cacheSingle) CommitIndex() uint64 {
return 0
}
// GetLeaderID returns always empty string for a single cache.
func (c cacheSingle) GetLeaderID() string {
return ""
}
// Health returns always false for single cache.
func (c cacheSingle) Health() (Health, bool) {
return nil, false
@ -121,8 +125,6 @@ type cache struct {
data State
health Health
leaderID string
}
func (c *cache) CommitIndex() uint64 {
@ -139,14 +141,6 @@ func (c *cache) Data() (State, bool) {
return c.data, c.valid
}
// GetLeaderID returns a leader ID or empty string if a leader is not known.
func (c *cache) GetLeaderID() string {
c.lock.RLock()
defer c.lock.RUnlock()
return c.leaderID
}
// Health returns always false for single cache.
func (c *cache) Health() (Health, bool) {
c.lock.RLock()
@ -167,7 +161,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
if err != nil {
// Invalidate a leader ID and agency state.
// In the next iteration leaderID will be sat because `valid` will be false.
c.leaderID = ""
c.valid = false
return 0, err
@ -180,7 +173,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
}
// A leader should be known even if an agency state is invalid.
c.leaderID = leaderConfig.LeaderId
if data, err := loadState(ctx, leaderCli); err != nil {
c.valid = false
return leaderConfig.CommitIndex, err
@ -194,7 +186,7 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) {
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
var mutex sync.Mutex
var anyError error
var wg sync.WaitGroup
@ -203,10 +195,12 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
if cliLen == 0 {
return nil, nil, nil, errors.New("empty list of agencies' clients")
}
configs := make([]*agencyConfig, cliLen)
leaders := make(map[string]int)
configs := make([]*Config, cliLen)
leaders := make(map[string]int, cliLen)
h := make(health)
var h health
h.commitIndexes = make(map[string]uint64, cliLen)
// Fetch all configs from agencies.
wg.Add(cliLen)
for i, cli := range clients {
@ -215,7 +209,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
config, err := getAgencyConfig(ctxLocal, cliLocal)
config, err := GetAgencyConfig(ctxLocal, cliLocal)
mutex.Lock()
defer mutex.Unlock()
@ -232,7 +226,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
configs[iLocal] = config
// Count leaders.
leaders[config.LeaderId]++
h[config.Configuration.ID] = config.CommitIndex
h.commitIndexes[config.Configuration.ID] = config.CommitIndex
}(i, cli)
}
wg.Wait()
@ -255,6 +249,8 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag
}
}
h.leaderID = leaderID
// Check if a leader has quorum from all possible agencies.
if maxVotes <= cliLen/2 {
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)

View file

@ -29,10 +29,12 @@ import (
"github.com/arangodb/go-driver/agency"
)
func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig, error) {
conn := client.Connection()
func GetAgencyConfig(ctx context.Context, client agency.Agency) (*Config, error) {
return GetAgencyConfigC(ctx, client.Connection())
}
req, err := client.Connection().NewRequest(http.MethodGet, "/_api/agency/config")
func GetAgencyConfigC(ctx context.Context, conn driver.Connection) (*Config, error) {
req, err := conn.NewRequest(http.MethodGet, "/_api/agency/config")
if err != nil {
return nil, err
}
@ -48,7 +50,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig,
return nil, err
}
var c agencyConfig
var c Config
if err := json.Unmarshal(data, &c); err != nil {
return nil, err
@ -57,7 +59,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig,
return &c, nil
}
type agencyConfig struct {
type Config struct {
LeaderId string `json:"leaderId"`
CommitIndex uint64 `json:"commitIndex"`

View file

@ -67,7 +67,7 @@ func Test_Config_Unmarshal(t *testing.T) {
"version": "3.10.0-devel"
}`
var cfg agencyConfig
var cfg Config
require.NoError(t, json.Unmarshal([]byte(data), &cfg))

View file

@ -154,6 +154,10 @@ func (d *Deployment) GetAgencyCache() (agency.State, bool) {
return d.agencyCache.Data()
}
func (d *Deployment) GetAgencyHealth() (agency.Health, bool) {
return d.agencyCache.Health()
}
func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) {
if d.apiObject.Spec.Mode.Get() == api.DeploymentModeSingle {
return 0, nil

View file

@ -188,6 +188,10 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
nextInterval = nextInterval.ReduceTo(x)
}
if err := d.resources.EnsureLeader(ctx, d.GetCachedStatus()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Creating agency pod leader failed")
}
if err := d.resources.EnsureArangoMembers(ctx, d.GetCachedStatus()); err != nil {
return minInspectionInterval, errors.Wrapf(err, "ArangoMember creation failed")
}

View file

@ -187,6 +187,10 @@ func (ac *actionContext) GenerateMemberEndpoint(group api.ServerGroup, member ap
return ac.context.GenerateMemberEndpoint(group, member)
}
func (ac *actionContext) GetAgencyHealth() (agencyCache.Health, bool) {
return ac.context.GetAgencyHealth()
}
func (ac *actionContext) GetAgencyCache() (agencyCache.State, bool) {
return ac.context.GetAgencyCache()
}

View file

@ -84,6 +84,11 @@ type testContext struct {
Inspector inspectorInterface.Inspector
}
func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) {
//TODO implement me
panic("implement me")
}
func (c *testContext) RenderPodForMember(ctx context.Context, acs sutil.ACS, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
//TODO implement me
panic("implement me")

View file

@ -124,6 +124,7 @@ type DeploymentCachedStatus interface {
type ArangoAgencyGet interface {
GetAgencyCache() (agencyCache.State, bool)
GetAgencyHealth() (agencyCache.Health, bool)
}
type ArangoAgency interface {

View file

@ -0,0 +1,151 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package resources
import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
)
// EnsureLeader creates leader label on the pod's agency and creates service to it.
// When agency leader is not known then all agencies' pods should not have leader label, and
// consequentially service will not point to any pod.
// It works only in active fail-over mode.
func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
if r.context.GetSpec().GetMode() != api.DeploymentModeActiveFailover {
return nil
}
cache, ok := r.context.GetAgencyHealth()
if !ok {
return nil
}
leaderID := cache.LeaderID()
status, _ := r.context.GetStatus()
noLeader := len(leaderID) == 0
changed := false
group := api.ServerGroupAgents
agencyServers := func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName)
if !exist {
continue
}
labels := pod.GetLabels()
if noLeader || m.ID != leaderID {
// Unset a leader when:
// - leader is unknown.
// - leader does not belong to the current pod.
if _, ok := labels[k8sutil.LabelKeyArangoLeader]; ok {
delete(labels, k8sutil.LabelKeyArangoLeader)
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
r.log.Error().Err(err).Msgf("Unable to remove leader label")
return err
}
r.log.Info().Msgf("leader label is removed from \"%s\" member", m.ID)
changed = true
}
continue
}
// From here on it is known that there is a leader, and it should be attached to the current pod.
if value, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok {
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else if value != "true" {
labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true")
} else {
// A pod is already a leader, so nothing to change.
continue
}
err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels))
if err != nil {
r.log.Error().Err(err).Msgf("Unable to update leader label")
return err
}
r.log.Info().Msgf("leader label is set on \"%s\" member", m.ID)
changed = true
}
return nil
}
if err := status.Members.ForeachServerInGroups(agencyServers, group); err != nil {
return err
}
if changed {
return errors.Reconcile()
}
changed = false
if noLeader {
// There is no leader agency so service may not exist, or it can exist with empty list of endpoints.
return nil
}
leaderAgentSvcName := k8sutil.CreateAgentLeaderServiceName(r.context.GetAPIObject().GetName())
deploymentName := r.context.GetAPIObject().GetName()
selector := k8sutil.LabelsForLeaderMember(deploymentName, group.AsRole(), leaderID)
if s, ok := cachedStatus.Service().V1().GetSimple(leaderAgentSvcName); ok {
if err, adjusted := r.adjustService(ctx, s, shared.ArangoPort, selector); err == nil {
if !adjusted {
// The service is not changed.
return nil
}
return errors.Reconcile()
} else {
return err
}
}
s := r.createService(leaderAgentSvcName, r.context.GetNamespace(), r.context.GetAPIObject().AsOwner(), shared.ArangoPort, selector)
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := r.context.ServicesModInterface().Create(ctxChild, s, meta.CreateOptions{})
return err
})
if err != nil {
if !k8sutil.IsConflict(err) {
return err
}
}
// The service has been created.
return errors.Reconcile()
}

View file

@ -36,12 +36,13 @@ import (
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/rs/zerolog"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1"
"github.com/rs/zerolog"
)
var (
@ -49,8 +50,73 @@ var (
inspectServicesDurationGauges = metrics.MustRegisterGaugeVec(metricsComponent, "inspect_services_duration", "Amount of time taken by a single inspection of all Services for a deployment (in sec)", metrics.DeploymentName)
)
// createService returns service's object.
func (r *Resources) createService(name, namespace string, owner meta.OwnerReference, targetPort int32,
selector map[string]string) *core.Service {
return &core.Service{
ObjectMeta: meta.ObjectMeta{
Name: name,
Namespace: namespace,
OwnerReferences: []meta.OwnerReference{
owner,
},
},
Spec: core.ServiceSpec{
Type: core.ServiceTypeClusterIP,
Ports: []core.ServicePort{
{
Name: "server",
Protocol: "TCP",
Port: shared.ArangoPort,
TargetPort: intstr.IntOrString{IntVal: targetPort},
},
},
PublishNotReadyAddresses: true,
Selector: selector,
},
}
}
// adjustService checks whether service contains is valid and if not than it reconciles service.
// Returns true if service is adjusted.
func (r *Resources) adjustService(ctx context.Context, s *core.Service, targetPort int32, selector map[string]string) (error, bool) {
services := r.context.ServicesModInterface()
spec := s.Spec.DeepCopy()
spec.Type = core.ServiceTypeClusterIP
spec.Ports = []core.ServicePort{
{
Name: "server",
Protocol: "TCP",
Port: shared.ArangoPort,
TargetPort: intstr.IntOrString{IntVal: targetPort},
},
}
spec.PublishNotReadyAddresses = true
spec.Selector = selector
if equality.Semantic.DeepDerivative(*spec, s.Spec) {
// The service has not changed, so nothing should be changed.
return nil, false
}
s.Spec = *spec
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := services.Update(ctxChild, s, meta.UpdateOptions{})
return err
})
if err != nil {
return err, false
}
// The service has been changed.
return nil, true
}
// EnsureServices creates all services needed to service the deployment
func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorInterface.Inspector) error {
log := r.log
start := time.Now()
apiObject := r.context.GetAPIObject()
@ -85,29 +151,9 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
return errors.Newf("Member %s not found", memberName)
}
selector := k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID)
if s, ok := cachedStatus.Service().V1().GetSimple(member.GetName()); !ok {
s = &core.Service{
ObjectMeta: meta.ObjectMeta{
Name: member.GetName(),
Namespace: member.GetNamespace(),
OwnerReferences: []meta.OwnerReference{
member.AsOwner(),
},
},
Spec: core.ServiceSpec{
Type: core.ServiceTypeClusterIP,
Ports: []core.ServicePort{
{
Name: "server",
Protocol: "TCP",
Port: shared.ArangoPort,
TargetPort: intstr.IntOrString{IntVal: targetPort},
},
},
PublishNotReadyAddresses: true,
Selector: k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID),
},
}
s := r.createService(member.GetName(), member.GetNamespace(), member.AsOwner(), targetPort, selector)
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := svcs.Create(ctxChild, s, meta.CreateOptions{})
@ -122,33 +168,13 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
reconcileRequired.Required()
continue
} else {
spec := s.Spec.DeepCopy()
spec.Type = core.ServiceTypeClusterIP
spec.Ports = []core.ServicePort{
{
Name: "server",
Protocol: "TCP",
Port: shared.ArangoPort,
TargetPort: intstr.IntOrString{IntVal: targetPort},
},
}
spec.PublishNotReadyAddresses = true
spec.Selector = k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID)
if !equality.Semantic.DeepDerivative(*spec, s.Spec) {
s.Spec = *spec
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := svcs.Update(ctxChild, s, meta.UpdateOptions{})
return err
})
if err != nil {
return err
}
if err, adjusted := r.adjustService(ctx, s, targetPort, selector); err == nil {
if adjusted {
reconcileRequired.Required()
continue
}
// Continue the loop.
} else {
return err
}
}
}

View file

@ -27,14 +27,13 @@ import (
"strconv"
"strings"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service"
servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1"
)
// CreateHeadlessServiceName returns the name of the headless service for the given
@ -67,6 +66,11 @@ func CreateExporterClientServiceName(deploymentName string) string {
return deploymentName + "-exporter"
}
// CreateAgentLeaderServiceName returns the name of the service used to access a leader agent.
func CreateAgentLeaderServiceName(deploymentName string) string {
return deploymentName + "-agent"
}
// CreateExporterService
func CreateExporterService(ctx context.Context, cachedStatus service.Inspector, svcs servicev1.ModInterface,
deployment metav1.Object, owner metav1.OwnerReference) (string, bool, error) {

View file

@ -44,7 +44,8 @@ const (
LabelKeyArangoScheduled = "deployment.arangodb.com/scheduled"
// LabelKeyArangoTopology is the key of the label used to store the ArangoDeployment topology ID in
LabelKeyArangoTopology = "deployment.arangodb.com/topology"
// LabelKeyArangoLeader is the key of the label used to store the current leader of a group instances.
LabelKeyArangoLeader = "deployment.arangodb.com/leader"
// AppName is the fixed value for the "app" label
AppName = "arangodb"
)
@ -98,6 +99,14 @@ func LabelsForMember(deploymentName, role, id string) map[string]string {
return l
}
// LabelsForLeaderMember returns a map of labels for given deployment name and member id and role and leadership.
func LabelsForLeaderMember(deploymentName, role, id string) map[string]string {
l := LabelsForMember(deploymentName, role, id)
l[LabelKeyArangoLeader] = "true"
return l
}
// LabelsForDeployment returns a map of labels, given to all resources for given deployment name
func LabelsForDeployment(deploymentName, role string) map[string]string {
l := map[string]string{