diff --git a/CHANGELOG.md b/CHANGELOG.md index 5339ac9b0..9f948e95f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ - (Feature) Add operator shutdown handler for graceful termination - (Feature) Add agency leader discovery - (Feature) Add `ACSDeploymentSynced` condition type and fix comparison of `SecretHashes` method +- (Feature) Add agency leader service ## [1.2.12](https://github.com/arangodb/kube-arangodb/tree/1.2.12) (2022-05-10) - (Feature) Add CoreV1 Endpoints Inspector diff --git a/pkg/deployment/agency/cache.go b/pkg/deployment/agency/cache.go index bf81a1ab9..3ac4ec5ca 100644 --- a/pkg/deployment/agency/cache.go +++ b/pkg/deployment/agency/cache.go @@ -31,7 +31,15 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/errors" ) -type health map[string]uint64 +type health struct { + leaderID string + + commitIndexes map[string]uint64 +} + +func (h health) LeaderID() string { + return h.leaderID +} // IsHealthy returns true if all agencies have the same commit index. // Returns false when: @@ -42,7 +50,7 @@ func (h health) IsHealthy() bool { var globalCommitIndex uint64 first := true - for _, commitIndex := range h { + for _, commitIndex := range h.commitIndexes { if first { globalCommitIndex = commitIndex first = false @@ -58,14 +66,15 @@ func (h health) IsHealthy() bool { type Health interface { // IsHealthy return true when environment is considered as healthy. IsHealthy() bool + + // LeaderID returns a leader ID or empty string if a leader is not known. + LeaderID() string } type Cache interface { Reload(ctx context.Context, clients []agency.Agency) (uint64, error) Data() (State, bool) CommitIndex() uint64 - // GetLeaderID returns a leader ID. - GetLeaderID() string // Health returns true when healthy object is available. Health() (Health, bool) } @@ -93,11 +102,6 @@ func (c cacheSingle) CommitIndex() uint64 { return 0 } -// GetLeaderID returns always empty string for a single cache. -func (c cacheSingle) GetLeaderID() string { - return "" -} - // Health returns always false for single cache. func (c cacheSingle) Health() (Health, bool) { return nil, false @@ -121,8 +125,6 @@ type cache struct { data State health Health - - leaderID string } func (c *cache) CommitIndex() uint64 { @@ -139,14 +141,6 @@ func (c *cache) Data() (State, bool) { return c.data, c.valid } -// GetLeaderID returns a leader ID or empty string if a leader is not known. -func (c *cache) GetLeaderID() string { - c.lock.RLock() - defer c.lock.RUnlock() - - return c.leaderID -} - // Health returns always false for single cache. func (c *cache) Health() (Health, bool) { c.lock.RLock() @@ -167,7 +161,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er if err != nil { // Invalidate a leader ID and agency state. // In the next iteration leaderID will be sat because `valid` will be false. - c.leaderID = "" c.valid = false return 0, err @@ -180,7 +173,6 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er } // A leader should be known even if an agency state is invalid. - c.leaderID = leaderConfig.LeaderId if data, err := loadState(ctx, leaderCli); err != nil { c.valid = false return leaderConfig.CommitIndex, err @@ -194,7 +186,7 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er // getLeader returns config and client to a leader agency, and health to check if agencies are on the same page. // If there is no quorum for the leader then error is returned. -func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *agencyConfig, Health, error) { +func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) { var mutex sync.Mutex var anyError error var wg sync.WaitGroup @@ -203,10 +195,12 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag if cliLen == 0 { return nil, nil, nil, errors.New("empty list of agencies' clients") } - configs := make([]*agencyConfig, cliLen) - leaders := make(map[string]int) + configs := make([]*Config, cliLen) + leaders := make(map[string]int, cliLen) - h := make(health) + var h health + + h.commitIndexes = make(map[string]uint64, cliLen) // Fetch all configs from agencies. wg.Add(cliLen) for i, cli := range clients { @@ -215,7 +209,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag ctxLocal, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - config, err := getAgencyConfig(ctxLocal, cliLocal) + config, err := GetAgencyConfig(ctxLocal, cliLocal) mutex.Lock() defer mutex.Unlock() @@ -232,7 +226,7 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag configs[iLocal] = config // Count leaders. leaders[config.LeaderId]++ - h[config.Configuration.ID] = config.CommitIndex + h.commitIndexes[config.Configuration.ID] = config.CommitIndex }(i, cli) } wg.Wait() @@ -255,6 +249,8 @@ func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *ag } } + h.leaderID = leaderID + // Check if a leader has quorum from all possible agencies. if maxVotes <= cliLen/2 { message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen) diff --git a/pkg/deployment/agency/config.go b/pkg/deployment/agency/config.go index 7a84442b6..cda4cab73 100644 --- a/pkg/deployment/agency/config.go +++ b/pkg/deployment/agency/config.go @@ -29,10 +29,12 @@ import ( "github.com/arangodb/go-driver/agency" ) -func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig, error) { - conn := client.Connection() +func GetAgencyConfig(ctx context.Context, client agency.Agency) (*Config, error) { + return GetAgencyConfigC(ctx, client.Connection()) +} - req, err := client.Connection().NewRequest(http.MethodGet, "/_api/agency/config") +func GetAgencyConfigC(ctx context.Context, conn driver.Connection) (*Config, error) { + req, err := conn.NewRequest(http.MethodGet, "/_api/agency/config") if err != nil { return nil, err } @@ -48,7 +50,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig, return nil, err } - var c agencyConfig + var c Config if err := json.Unmarshal(data, &c); err != nil { return nil, err @@ -57,7 +59,7 @@ func getAgencyConfig(ctx context.Context, client agency.Agency) (*agencyConfig, return &c, nil } -type agencyConfig struct { +type Config struct { LeaderId string `json:"leaderId"` CommitIndex uint64 `json:"commitIndex"` diff --git a/pkg/deployment/agency/config_test.go b/pkg/deployment/agency/config_test.go index 00c98a596..f8c8be23d 100644 --- a/pkg/deployment/agency/config_test.go +++ b/pkg/deployment/agency/config_test.go @@ -67,7 +67,7 @@ func Test_Config_Unmarshal(t *testing.T) { "version": "3.10.0-devel" }` - var cfg agencyConfig + var cfg Config require.NoError(t, json.Unmarshal([]byte(data), &cfg)) diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index d8dc4bdf2..6c8b8d6b5 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -154,6 +154,10 @@ func (d *Deployment) GetAgencyCache() (agency.State, bool) { return d.agencyCache.Data() } +func (d *Deployment) GetAgencyHealth() (agency.Health, bool) { + return d.agencyCache.Health() +} + func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) { if d.apiObject.Spec.Mode.Get() == api.DeploymentModeSingle { return 0, nil diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go index cd517ec56..ea94796df 100644 --- a/pkg/deployment/deployment_inspector.go +++ b/pkg/deployment/deployment_inspector.go @@ -188,6 +188,10 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva nextInterval = nextInterval.ReduceTo(x) } + if err := d.resources.EnsureLeader(ctx, d.GetCachedStatus()); err != nil { + return minInspectionInterval, errors.Wrapf(err, "Creating agency pod leader failed") + } + if err := d.resources.EnsureArangoMembers(ctx, d.GetCachedStatus()); err != nil { return minInspectionInterval, errors.Wrapf(err, "ArangoMember creation failed") } diff --git a/pkg/deployment/reconcile/action_context.go b/pkg/deployment/reconcile/action_context.go index 7f8e49e14..bbfd5d1ae 100644 --- a/pkg/deployment/reconcile/action_context.go +++ b/pkg/deployment/reconcile/action_context.go @@ -187,6 +187,10 @@ func (ac *actionContext) GenerateMemberEndpoint(group api.ServerGroup, member ap return ac.context.GenerateMemberEndpoint(group, member) } +func (ac *actionContext) GetAgencyHealth() (agencyCache.Health, bool) { + return ac.context.GetAgencyHealth() +} + func (ac *actionContext) GetAgencyCache() (agencyCache.State, bool) { return ac.context.GetAgencyCache() } diff --git a/pkg/deployment/reconcile/plan_builder_test.go b/pkg/deployment/reconcile/plan_builder_test.go index b5d36e4ba..a3df6d421 100644 --- a/pkg/deployment/reconcile/plan_builder_test.go +++ b/pkg/deployment/reconcile/plan_builder_test.go @@ -84,6 +84,11 @@ type testContext struct { Inspector inspectorInterface.Inspector } +func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) { + //TODO implement me + panic("implement me") +} + func (c *testContext) RenderPodForMember(ctx context.Context, acs sutil.ACS, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) { //TODO implement me panic("implement me") diff --git a/pkg/deployment/reconciler/context.go b/pkg/deployment/reconciler/context.go index d558a4277..ba9884220 100644 --- a/pkg/deployment/reconciler/context.go +++ b/pkg/deployment/reconciler/context.go @@ -124,6 +124,7 @@ type DeploymentCachedStatus interface { type ArangoAgencyGet interface { GetAgencyCache() (agencyCache.State, bool) + GetAgencyHealth() (agencyCache.Health, bool) } type ArangoAgency interface { diff --git a/pkg/deployment/resources/pod_leader.go b/pkg/deployment/resources/pod_leader.go new file mode 100644 index 000000000..32124457f --- /dev/null +++ b/pkg/deployment/resources/pod_leader.go @@ -0,0 +1,151 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package resources + +import ( + "context" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" + "github.com/arangodb/kube-arangodb/pkg/apis/shared" + "github.com/arangodb/kube-arangodb/pkg/deployment/patch" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/globals" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" +) + +// EnsureLeader creates leader label on the pod's agency and creates service to it. +// When agency leader is not known then all agencies' pods should not have leader label, and +// consequentially service will not point to any pod. +// It works only in active fail-over mode. +func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInterface.Inspector) error { + if r.context.GetSpec().GetMode() != api.DeploymentModeActiveFailover { + return nil + } + + cache, ok := r.context.GetAgencyHealth() + if !ok { + return nil + } + + leaderID := cache.LeaderID() + status, _ := r.context.GetStatus() + noLeader := len(leaderID) == 0 + changed := false + group := api.ServerGroupAgents + agencyServers := func(group api.ServerGroup, list api.MemberStatusList) error { + for _, m := range list { + pod, exist := cachedStatus.Pod().V1().GetSimple(m.PodName) + if !exist { + continue + } + + labels := pod.GetLabels() + if noLeader || m.ID != leaderID { + // Unset a leader when: + // - leader is unknown. + // - leader does not belong to the current pod. + + if _, ok := labels[k8sutil.LabelKeyArangoLeader]; ok { + delete(labels, k8sutil.LabelKeyArangoLeader) + + err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels)) + if err != nil { + r.log.Error().Err(err).Msgf("Unable to remove leader label") + return err + } + + r.log.Info().Msgf("leader label is removed from \"%s\" member", m.ID) + changed = true + } + + continue + } + + // From here on it is known that there is a leader, and it should be attached to the current pod. + if value, ok := labels[k8sutil.LabelKeyArangoLeader]; !ok { + labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true") + } else if value != "true" { + labels = addLabel(labels, k8sutil.LabelKeyArangoLeader, "true") + } else { + // A pod is already a leader, so nothing to change. + continue + } + + err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), labels)) + if err != nil { + r.log.Error().Err(err).Msgf("Unable to update leader label") + return err + } + r.log.Info().Msgf("leader label is set on \"%s\" member", m.ID) + changed = true + } + + return nil + } + + if err := status.Members.ForeachServerInGroups(agencyServers, group); err != nil { + return err + } + + if changed { + return errors.Reconcile() + } + changed = false + + if noLeader { + // There is no leader agency so service may not exist, or it can exist with empty list of endpoints. + return nil + } + + leaderAgentSvcName := k8sutil.CreateAgentLeaderServiceName(r.context.GetAPIObject().GetName()) + deploymentName := r.context.GetAPIObject().GetName() + + selector := k8sutil.LabelsForLeaderMember(deploymentName, group.AsRole(), leaderID) + if s, ok := cachedStatus.Service().V1().GetSimple(leaderAgentSvcName); ok { + if err, adjusted := r.adjustService(ctx, s, shared.ArangoPort, selector); err == nil { + if !adjusted { + // The service is not changed. + return nil + } + + return errors.Reconcile() + } else { + return err + } + } + + s := r.createService(leaderAgentSvcName, r.context.GetNamespace(), r.context.GetAPIObject().AsOwner(), shared.ArangoPort, selector) + err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { + _, err := r.context.ServicesModInterface().Create(ctxChild, s, meta.CreateOptions{}) + return err + }) + if err != nil { + if !k8sutil.IsConflict(err) { + return err + } + } + + // The service has been created. + return errors.Reconcile() +} diff --git a/pkg/deployment/resources/services.go b/pkg/deployment/resources/services.go index 29aa3d4b5..013ac9f64 100644 --- a/pkg/deployment/resources/services.go +++ b/pkg/deployment/resources/services.go @@ -36,12 +36,13 @@ import ( core "k8s.io/api/core/v1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/rs/zerolog" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/apis/shared" "github.com/arangodb/kube-arangodb/pkg/metrics" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1" - "github.com/rs/zerolog" ) var ( @@ -49,8 +50,73 @@ var ( inspectServicesDurationGauges = metrics.MustRegisterGaugeVec(metricsComponent, "inspect_services_duration", "Amount of time taken by a single inspection of all Services for a deployment (in sec)", metrics.DeploymentName) ) +// createService returns service's object. +func (r *Resources) createService(name, namespace string, owner meta.OwnerReference, targetPort int32, + selector map[string]string) *core.Service { + + return &core.Service{ + ObjectMeta: meta.ObjectMeta{ + Name: name, + Namespace: namespace, + OwnerReferences: []meta.OwnerReference{ + owner, + }, + }, + Spec: core.ServiceSpec{ + Type: core.ServiceTypeClusterIP, + Ports: []core.ServicePort{ + { + Name: "server", + Protocol: "TCP", + Port: shared.ArangoPort, + TargetPort: intstr.IntOrString{IntVal: targetPort}, + }, + }, + PublishNotReadyAddresses: true, + Selector: selector, + }, + } +} + +// adjustService checks whether service contains is valid and if not than it reconciles service. +// Returns true if service is adjusted. +func (r *Resources) adjustService(ctx context.Context, s *core.Service, targetPort int32, selector map[string]string) (error, bool) { + services := r.context.ServicesModInterface() + spec := s.Spec.DeepCopy() + + spec.Type = core.ServiceTypeClusterIP + spec.Ports = []core.ServicePort{ + { + Name: "server", + Protocol: "TCP", + Port: shared.ArangoPort, + TargetPort: intstr.IntOrString{IntVal: targetPort}, + }, + } + spec.PublishNotReadyAddresses = true + spec.Selector = selector + if equality.Semantic.DeepDerivative(*spec, s.Spec) { + // The service has not changed, so nothing should be changed. + return nil, false + } + + s.Spec = *spec + err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { + _, err := services.Update(ctxChild, s, meta.UpdateOptions{}) + return err + }) + if err != nil { + return err, false + } + + // The service has been changed. + return nil, true + +} + // EnsureServices creates all services needed to service the deployment func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorInterface.Inspector) error { + log := r.log start := time.Now() apiObject := r.context.GetAPIObject() @@ -85,29 +151,9 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn return errors.Newf("Member %s not found", memberName) } + selector := k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID) if s, ok := cachedStatus.Service().V1().GetSimple(member.GetName()); !ok { - s = &core.Service{ - ObjectMeta: meta.ObjectMeta{ - Name: member.GetName(), - Namespace: member.GetNamespace(), - OwnerReferences: []meta.OwnerReference{ - member.AsOwner(), - }, - }, - Spec: core.ServiceSpec{ - Type: core.ServiceTypeClusterIP, - Ports: []core.ServicePort{ - { - Name: "server", - Protocol: "TCP", - Port: shared.ArangoPort, - TargetPort: intstr.IntOrString{IntVal: targetPort}, - }, - }, - PublishNotReadyAddresses: true, - Selector: k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID), - }, - } + s := r.createService(member.GetName(), member.GetNamespace(), member.AsOwner(), targetPort, selector) err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { _, err := svcs.Create(ctxChild, s, meta.CreateOptions{}) @@ -122,33 +168,13 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn reconcileRequired.Required() continue } else { - spec := s.Spec.DeepCopy() - - spec.Type = core.ServiceTypeClusterIP - spec.Ports = []core.ServicePort{ - { - Name: "server", - Protocol: "TCP", - Port: shared.ArangoPort, - TargetPort: intstr.IntOrString{IntVal: targetPort}, - }, - } - spec.PublishNotReadyAddresses = true - spec.Selector = k8sutil.LabelsForMember(deploymentName, group.AsRole(), m.ID) - - if !equality.Semantic.DeepDerivative(*spec, s.Spec) { - s.Spec = *spec - - err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { - _, err := svcs.Update(ctxChild, s, meta.UpdateOptions{}) - return err - }) - if err != nil { - return err + if err, adjusted := r.adjustService(ctx, s, targetPort, selector); err == nil { + if adjusted { + reconcileRequired.Required() } - - reconcileRequired.Required() - continue + // Continue the loop. + } else { + return err } } } diff --git a/pkg/util/k8sutil/services.go b/pkg/util/k8sutil/services.go index 36f85b99a..a89fd0537 100644 --- a/pkg/util/k8sutil/services.go +++ b/pkg/util/k8sutil/services.go @@ -27,14 +27,13 @@ import ( "strconv" "strings" - "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service" - - "github.com/arangodb/kube-arangodb/pkg/util/errors" - - "github.com/arangodb/kube-arangodb/pkg/apis/shared" - servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1" core "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/arangodb/kube-arangodb/pkg/apis/shared" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service" + servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1" ) // CreateHeadlessServiceName returns the name of the headless service for the given @@ -67,6 +66,11 @@ func CreateExporterClientServiceName(deploymentName string) string { return deploymentName + "-exporter" } +// CreateAgentLeaderServiceName returns the name of the service used to access a leader agent. +func CreateAgentLeaderServiceName(deploymentName string) string { + return deploymentName + "-agent" +} + // CreateExporterService func CreateExporterService(ctx context.Context, cachedStatus service.Inspector, svcs servicev1.ModInterface, deployment metav1.Object, owner metav1.OwnerReference) (string, bool, error) { diff --git a/pkg/util/k8sutil/util.go b/pkg/util/k8sutil/util.go index 13ce432e3..b3a30dd78 100644 --- a/pkg/util/k8sutil/util.go +++ b/pkg/util/k8sutil/util.go @@ -44,7 +44,8 @@ const ( LabelKeyArangoScheduled = "deployment.arangodb.com/scheduled" // LabelKeyArangoTopology is the key of the label used to store the ArangoDeployment topology ID in LabelKeyArangoTopology = "deployment.arangodb.com/topology" - + // LabelKeyArangoLeader is the key of the label used to store the current leader of a group instances. + LabelKeyArangoLeader = "deployment.arangodb.com/leader" // AppName is the fixed value for the "app" label AppName = "arangodb" ) @@ -98,6 +99,14 @@ func LabelsForMember(deploymentName, role, id string) map[string]string { return l } +// LabelsForLeaderMember returns a map of labels for given deployment name and member id and role and leadership. +func LabelsForLeaderMember(deploymentName, role, id string) map[string]string { + l := LabelsForMember(deploymentName, role, id) + l[LabelKeyArangoLeader] = "true" + + return l +} + // LabelsForDeployment returns a map of labels, given to all resources for given deployment name func LabelsForDeployment(deploymentName, role string) map[string]string { l := map[string]string{