mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Bugfix] Fix ArangoMember management (#760)
This commit is contained in:
parent
ddcd758bdc
commit
2879d36a79
7 changed files with 158 additions and 46 deletions
|
@ -1,6 +1,7 @@
|
|||
# Change Log
|
||||
|
||||
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
|
||||
- Fix ArangoMember race with multiple ArangoDeployments within single namespace
|
||||
|
||||
## [1.2.0](https://github.com/arangodb/kube-arangodb/tree/1.2.0) (2021-07-16)
|
||||
- Enable "Operator Internal Metrics Exporter" by default
|
||||
|
|
|
@ -22,11 +22,15 @@
|
|||
|
||||
package v1
|
||||
|
||||
import core "k8s.io/api/core/v1"
|
||||
import (
|
||||
core "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
type ArangoMemberSpec struct {
|
||||
Group ServerGroup `json:"group,omitempty"`
|
||||
ID string `json:"id,omitempty"`
|
||||
Group ServerGroup `json:"group,omitempty"`
|
||||
ID string `json:"id,omitempty"`
|
||||
DeploymentUID types.UID `json:"deploymentUID,omitempty"`
|
||||
|
||||
Template *core.PodTemplate `json:"template,omitempty"`
|
||||
TemplateChecksum string `json:"templateChecksum,omitempty"`
|
||||
|
|
|
@ -27,6 +27,8 @@ import (
|
|||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
|
||||
|
||||
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
|
@ -171,20 +173,26 @@ func (r *Resources) EnsureArangoMembers(ctx context.Context, cachedStatus inspec
|
|||
s, _ := r.context.GetStatus()
|
||||
obj := r.context.GetAPIObject()
|
||||
|
||||
reconcileRequired := k8sutil.NewReconcile()
|
||||
|
||||
if err := s.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
|
||||
for _, member := range list {
|
||||
name := member.ArangoMemberName(r.context.GetAPIObject().GetName(), group)
|
||||
|
||||
if _, ok := cachedStatus.ArangoMember(name); !ok {
|
||||
if m, ok := cachedStatus.ArangoMember(name); !ok {
|
||||
// Create ArangoMember
|
||||
a := api.ArangoMember{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: r.context.GetNamespace(),
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
obj.AsOwner(),
|
||||
},
|
||||
},
|
||||
Spec: api.ArangoMemberSpec{
|
||||
Group: group,
|
||||
ID: member.ID,
|
||||
Group: group,
|
||||
ID: member.ID,
|
||||
DeploymentUID: obj.GetUID(),
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -196,7 +204,34 @@ func (r *Resources) EnsureArangoMembers(ctx context.Context, cachedStatus inspec
|
|||
return err
|
||||
}
|
||||
|
||||
return errors.Reconcile()
|
||||
reconcileRequired.Required()
|
||||
continue
|
||||
} else {
|
||||
changed := false
|
||||
if len(m.OwnerReferences) == 0 {
|
||||
m.OwnerReferences = []metav1.OwnerReference{
|
||||
obj.AsOwner(),
|
||||
}
|
||||
changed = true
|
||||
}
|
||||
|
||||
if m.Spec.DeploymentUID == "" {
|
||||
m.Spec.DeploymentUID = obj.GetUID()
|
||||
changed = true
|
||||
}
|
||||
if changed {
|
||||
|
||||
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
_, err := r.context.GetArangoCli().DatabaseV1().ArangoMembers(obj.GetNamespace()).Update(ctxChild, m, metav1.UpdateOptions{})
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reconcileRequired.Required()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -205,6 +240,10 @@ func (r *Resources) EnsureArangoMembers(ctx context.Context, cachedStatus inspec
|
|||
return err
|
||||
}
|
||||
|
||||
if err := reconcileRequired.Reconcile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cachedStatus.IterateArangoMembers(func(member *api.ArangoMember) error {
|
||||
_, g, ok := s.Members.ElementByID(member.Spec.ID)
|
||||
|
||||
|
@ -220,11 +259,15 @@ func (r *Resources) EnsureArangoMembers(ctx context.Context, cachedStatus inspec
|
|||
}
|
||||
}
|
||||
|
||||
return errors.Reconcile()
|
||||
reconcileRequired.Required()
|
||||
}
|
||||
|
||||
return nil
|
||||
}); err != nil {
|
||||
}, arangomember.FilterByDeploymentUID(obj.GetUID())); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := reconcileRequired.Reconcile(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -87,9 +87,11 @@ func (r *Resources) EnsureSecrets(ctx context.Context, log zerolog.Logger, cache
|
|||
defer metrics.SetDuration(inspectSecretsDurationGauges.WithLabelValues(deploymentName), start)
|
||||
counterMetric := inspectedSecretsCounters.WithLabelValues(deploymentName)
|
||||
|
||||
reconcileRequired := k8sutil.NewReconcile()
|
||||
|
||||
if spec.IsAuthenticated() {
|
||||
counterMetric.Inc()
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureTokenSecret(ctx, cachedStatus, secrets, spec.Authentication.GetJWTSecretName())); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureTokenSecret(ctx, cachedStatus, secrets, spec.Authentication.GetJWTSecretName())); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -103,11 +105,11 @@ func (r *Resources) EnsureSecrets(ctx context.Context, log zerolog.Logger, cache
|
|||
|
||||
if spec.Metrics.IsEnabled() {
|
||||
if imageFound && pod.VersionHasJWTSecretKeyfolder(image.ArangoDBVersion, image.Enterprise) {
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureExporterTokenSecret(ctx, cachedStatus, secrets, spec.Metrics.GetJWTTokenSecretName(), pod.JWTSecretFolder(deploymentName))); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureExporterTokenSecret(ctx, cachedStatus, secrets, spec.Metrics.GetJWTTokenSecretName(), pod.JWTSecretFolder(deploymentName))); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
} else {
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureExporterTokenSecret(ctx, cachedStatus, secrets, spec.Metrics.GetJWTTokenSecretName(), spec.Authentication.GetJWTSecretName())); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureExporterTokenSecret(ctx, cachedStatus, secrets, spec.Metrics.GetJWTTokenSecretName(), spec.Authentication.GetJWTSecretName())); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
|
@ -115,11 +117,11 @@ func (r *Resources) EnsureSecrets(ctx context.Context, log zerolog.Logger, cache
|
|||
}
|
||||
if spec.IsSecure() {
|
||||
counterMetric.Inc()
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureTLSCACertificateSecret(ctx, cachedStatus, secrets, spec.TLS)); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureTLSCACertificateSecret(ctx, cachedStatus, secrets, spec.TLS)); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureSecretWithEmptyKey(ctx, cachedStatus, secrets, GetCASecretName(r.context.GetAPIObject()), "empty")); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureSecretWithEmptyKey(ctx, cachedStatus, secrets, GetCASecretName(r.context.GetAPIObject()), "empty")); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
|
@ -165,13 +167,9 @@ func (r *Resources) EnsureSecrets(ctx context.Context, log zerolog.Logger, cache
|
|||
}
|
||||
owner := member.AsOwner()
|
||||
errCert := createTLSServerCertificate(ctx, log, secrets, serverNames, spec.TLS, tlsKeyfileSecretName, &owner)
|
||||
if err := r.refreshCache(ctx, cachedStatus, errCert); err != nil && !k8sutil.IsAlreadyExists(err) {
|
||||
if err := reconcileRequired.WithError(errCert); err != nil && !k8sutil.IsAlreadyExists(err) {
|
||||
return errors.WithStack(errors.Wrapf(err, "Failed to create TLS keyfile secret"))
|
||||
}
|
||||
|
||||
if err := r.refreshCache(ctx, cachedStatus, operatorErrors.Reconcile()); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -181,47 +179,30 @@ func (r *Resources) EnsureSecrets(ctx context.Context, log zerolog.Logger, cache
|
|||
}
|
||||
if spec.RocksDB.IsEncrypted() {
|
||||
if i := status.CurrentImage; i != nil && features.EncryptionRotation().Supported(i.ArangoDBVersion, i.Enterprise) {
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureEncryptionKeyfolderSecret(ctx, cachedStatus, secrets, spec.RocksDB.Encryption.GetKeySecretName(), pod.GetEncryptionFolderSecretName(deploymentName))); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureEncryptionKeyfolderSecret(ctx, cachedStatus, secrets, spec.RocksDB.Encryption.GetKeySecretName(), pod.GetEncryptionFolderSecretName(deploymentName))); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
if spec.Sync.IsEnabled() {
|
||||
counterMetric.Inc()
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureTokenSecret(ctx, cachedStatus, secrets, spec.Sync.Authentication.GetJWTSecretName())); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureTokenSecret(ctx, cachedStatus, secrets, spec.Sync.Authentication.GetJWTSecretName())); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
counterMetric.Inc()
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureTokenSecret(ctx, cachedStatus, secrets, spec.Sync.Monitoring.GetTokenSecretName())); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureTokenSecret(ctx, cachedStatus, secrets, spec.Sync.Monitoring.GetTokenSecretName())); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
counterMetric.Inc()
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureTLSCACertificateSecret(ctx, cachedStatus, secrets, spec.Sync.TLS)); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureTLSCACertificateSecret(ctx, cachedStatus, secrets, spec.Sync.TLS)); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
counterMetric.Inc()
|
||||
if err := r.refreshCache(ctx, cachedStatus, r.ensureClientAuthCACertificateSecret(ctx, cachedStatus, secrets, spec.Sync.Authentication)); err != nil {
|
||||
if err := reconcileRequired.WithError(r.ensureClientAuthCACertificateSecret(ctx, cachedStatus, secrets, spec.Sync.Authentication)); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Resources) refreshCache(ctx context.Context, cachedStatus inspectorInterface.Inspector, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if operatorErrors.IsReconcile(err) {
|
||||
err := cachedStatus.Refresh(ctx, r.context.GetKubeCli(), r.context.GetMonitoringV1Cli(), r.context.GetArangoCli(), r.context.GetNamespace())
|
||||
if err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
} else {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
return reconcileRequired.Reconcile()
|
||||
}
|
||||
|
||||
func (r *Resources) ensureTokenSecretFolder(ctx context.Context, cachedStatus inspectorInterface.Inspector, secrets k8sutil.SecretInterface, secretName, folderSecretName string) error {
|
||||
|
|
|
@ -65,6 +65,8 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
// Fetch existing services
|
||||
svcs := kubecli.CoreV1().Services(ns)
|
||||
|
||||
reconcileRequired := k8sutil.NewReconcile()
|
||||
|
||||
// Ensure member services
|
||||
if err := status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
|
||||
for _, m := range list {
|
||||
|
@ -109,7 +111,8 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
}
|
||||
}
|
||||
|
||||
return errors.Reconcile()
|
||||
reconcileRequired.Required()
|
||||
continue
|
||||
} else {
|
||||
spec := s.Spec.DeepCopy()
|
||||
|
||||
|
@ -136,7 +139,8 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
return err
|
||||
}
|
||||
|
||||
return errors.Reconcile()
|
||||
reconcileRequired.Required()
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -229,7 +233,8 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
||||
return reconcileRequired.Reconcile()
|
||||
}
|
||||
|
||||
// EnsureServices creates all services needed to service the deployment
|
||||
|
|
|
@ -22,7 +22,10 @@
|
|||
|
||||
package arangomember
|
||||
|
||||
import api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
import (
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
)
|
||||
|
||||
type Inspector interface {
|
||||
ArangoMember(name string) (*api.ArangoMember, bool)
|
||||
|
@ -31,3 +34,9 @@ type Inspector interface {
|
|||
|
||||
type ArangoMemberFilter func(pod *api.ArangoMember) bool
|
||||
type ArangoMemberAction func(pod *api.ArangoMember) error
|
||||
|
||||
func FilterByDeploymentUID(uid types.UID) ArangoMemberFilter {
|
||||
return func(pod *api.ArangoMember) bool {
|
||||
return pod.Spec.DeploymentUID == "" || pod.Spec.DeploymentUID == uid
|
||||
}
|
||||
}
|
||||
|
|
69
pkg/util/k8sutil/reconcile.go
Normal file
69
pkg/util/k8sutil/reconcile.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Adam Janikowski
|
||||
//
|
||||
|
||||
package k8sutil
|
||||
|
||||
import "github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
|
||||
func NewReconcile() Reconcile {
|
||||
return &reconcile{}
|
||||
}
|
||||
|
||||
type Reconcile interface {
|
||||
Reconcile() error
|
||||
Required()
|
||||
IsRequired() bool
|
||||
WithError(err error) error
|
||||
}
|
||||
|
||||
type reconcile struct {
|
||||
required bool
|
||||
}
|
||||
|
||||
func (r *reconcile) Reconcile() error {
|
||||
if r.required {
|
||||
return errors.Reconcile()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *reconcile) Required() {
|
||||
r.required = true
|
||||
}
|
||||
|
||||
func (r *reconcile) IsRequired() bool {
|
||||
return r.required
|
||||
}
|
||||
|
||||
func (r *reconcile) WithError(err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if errors.IsReconcile(err) {
|
||||
r.Required()
|
||||
return nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
Loading…
Reference in a new issue