mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Feature] Ensure consistency during cancellation of replication (#1134)
This commit is contained in:
parent
1c913aa2b9
commit
7b4c926011
12 changed files with 363 additions and 86 deletions
|
@ -2,6 +2,7 @@
|
|||
|
||||
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
|
||||
- (Feature) Add action progress
|
||||
- (Feature) Ensure consistency during replication cancellation
|
||||
|
||||
## [1.2.19](https://github.com/arangodb/kube-arangodb/tree/1.2.19) (2022-10-05)
|
||||
- (Bugfix) Prevent changes when UID is wrong
|
||||
|
|
|
@ -31,6 +31,17 @@ type ConditionType string
|
|||
const (
|
||||
// ConditionTypeConfigured indicates that the replication has been configured.
|
||||
ConditionTypeConfigured ConditionType = "Configured"
|
||||
// ConditionTypeEnsuredInSync indicates that the replication consistency was checked.
|
||||
ConditionTypeEnsuredInSync ConditionType = "EnsuredInSync"
|
||||
// ConditionTypeAborted indicates that the replication was canceled with abort option.
|
||||
ConditionTypeAborted ConditionType = "Aborted"
|
||||
|
||||
// ConditionConfiguredReasonActive describes synchronization as active.
|
||||
ConditionConfiguredReasonActive = "Active"
|
||||
// ConditionConfiguredReasonInactive describes synchronization as inactive.
|
||||
ConditionConfiguredReasonInactive = "Inactive"
|
||||
// ConditionConfiguredReasonInvalid describes synchronization as active.
|
||||
ConditionConfiguredReasonInvalid = "Invalid"
|
||||
)
|
||||
|
||||
// Condition represents one current condition of a deployment or deployment member.
|
||||
|
|
|
@ -27,6 +27,18 @@ import "github.com/arangodb/kube-arangodb/pkg/util/errors"
|
|||
type DeploymentReplicationSpec struct {
|
||||
Source EndpointSpec `json:"source"`
|
||||
Destination EndpointSpec `json:"destination"`
|
||||
// Cancellation describes what to do during cancellation process.
|
||||
Cancellation DeploymentReplicationCancel `json:"cancellation"`
|
||||
}
|
||||
|
||||
// DeploymentReplicationCancel describes what to do during cancellation process.
|
||||
type DeploymentReplicationCancel struct {
|
||||
// EnsureInSync if it is true then during cancellation process data consistency is required.
|
||||
// Default value is true.
|
||||
EnsureInSync *bool `json:"ensureInSync"`
|
||||
// SourceReadOnly if it true then after cancellation source data center should be in read-only mode.
|
||||
// Default value is false.
|
||||
SourceReadOnly *bool `json:"sourceReadOnly"`
|
||||
}
|
||||
|
||||
// Validate the given spec, returning an error on validation
|
||||
|
|
|
@ -36,6 +36,7 @@ type DeploymentReplicationStatus struct {
|
|||
// Destination contains the detailed status of the destination endpoint
|
||||
Destination EndpointStatus `json:"destination"`
|
||||
|
||||
// Deprecated: this field will not be updated anymore
|
||||
// CancelFailures records the number of times that the configuration was canceled
|
||||
// which resulted in an error.
|
||||
CancelFailures int `json:"cancel-failures,omitempty"`
|
||||
|
|
27
pkg/apis/replication/v1/zz_generated.deepcopy.go
generated
27
pkg/apis/replication/v1/zz_generated.deepcopy.go
generated
|
@ -211,11 +211,38 @@ func (in *DatabaseSynchronizationStatus) DeepCopy() *DatabaseSynchronizationStat
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeploymentReplicationCancel) DeepCopyInto(out *DeploymentReplicationCancel) {
|
||||
*out = *in
|
||||
if in.EnsureInSync != nil {
|
||||
in, out := &in.EnsureInSync, &out.EnsureInSync
|
||||
*out = new(bool)
|
||||
**out = **in
|
||||
}
|
||||
if in.SourceReadOnly != nil {
|
||||
in, out := &in.SourceReadOnly, &out.SourceReadOnly
|
||||
*out = new(bool)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentReplicationCancel.
|
||||
func (in *DeploymentReplicationCancel) DeepCopy() *DeploymentReplicationCancel {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DeploymentReplicationCancel)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeploymentReplicationSpec) DeepCopyInto(out *DeploymentReplicationSpec) {
|
||||
*out = *in
|
||||
in.Source.DeepCopyInto(&out.Source)
|
||||
in.Destination.DeepCopyInto(&out.Destination)
|
||||
in.Cancellation.DeepCopyInto(&out.Cancellation)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,17 @@ type ConditionType string
|
|||
const (
|
||||
// ConditionTypeConfigured indicates that the replication has been configured.
|
||||
ConditionTypeConfigured ConditionType = "Configured"
|
||||
// ConditionTypeEnsuredInSync indicates that the replication consistency was checked.
|
||||
ConditionTypeEnsuredInSync ConditionType = "EnsuredInSync"
|
||||
// ConditionTypeAborted indicates that the replication is cancelling with abort option.
|
||||
ConditionTypeAborted ConditionType = "Aborted"
|
||||
|
||||
// ConditionConfiguredReasonActive describes synchronization as active.
|
||||
ConditionConfiguredReasonActive = "Active"
|
||||
// ConditionConfiguredReasonInactive describes synchronization as inactive.
|
||||
ConditionConfiguredReasonInactive = "Inactive"
|
||||
// ConditionConfiguredReasonInvalid describes synchronization as active.
|
||||
ConditionConfiguredReasonInvalid = "Invalid"
|
||||
)
|
||||
|
||||
// Condition represents one current condition of a deployment or deployment member.
|
||||
|
|
|
@ -27,6 +27,18 @@ import "github.com/arangodb/kube-arangodb/pkg/util/errors"
|
|||
type DeploymentReplicationSpec struct {
|
||||
Source EndpointSpec `json:"source"`
|
||||
Destination EndpointSpec `json:"destination"`
|
||||
// Cancellation describes what to do during cancellation process.
|
||||
Cancellation DeploymentReplicationCancel `json:"cancellation"`
|
||||
}
|
||||
|
||||
// DeploymentReplicationCancel describes what to do during cancellation process.
|
||||
type DeploymentReplicationCancel struct {
|
||||
// EnsureInSync if it is true then during cancellation process data consistency is required.
|
||||
// Default value is true.
|
||||
EnsureInSync *bool `json:"ensureInSync"`
|
||||
// SourceReadOnly if it true then after cancellation source data center should be in read-only mode.
|
||||
// Default value is false.
|
||||
SourceReadOnly *bool `json:"sourceReadOnly"`
|
||||
}
|
||||
|
||||
// Validate the given spec, returning an error on validation
|
||||
|
|
|
@ -36,6 +36,7 @@ type DeploymentReplicationStatus struct {
|
|||
// Destination contains the detailed status of the destination endpoint
|
||||
Destination EndpointStatus `json:"destination"`
|
||||
|
||||
// Deprecated: this field will not be updated anymore
|
||||
// CancelFailures records the number of times that the configuration was canceled
|
||||
// which resulted in an error.
|
||||
CancelFailures int `json:"cancel-failures,omitempty"`
|
||||
|
|
|
@ -211,11 +211,38 @@ func (in *DatabaseSynchronizationStatus) DeepCopy() *DatabaseSynchronizationStat
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeploymentReplicationCancel) DeepCopyInto(out *DeploymentReplicationCancel) {
|
||||
*out = *in
|
||||
if in.EnsureInSync != nil {
|
||||
in, out := &in.EnsureInSync, &out.EnsureInSync
|
||||
*out = new(bool)
|
||||
**out = **in
|
||||
}
|
||||
if in.SourceReadOnly != nil {
|
||||
in, out := &in.SourceReadOnly, &out.SourceReadOnly
|
||||
*out = new(bool)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DeploymentReplicationCancel.
|
||||
func (in *DeploymentReplicationCancel) DeepCopy() *DeploymentReplicationCancel {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DeploymentReplicationCancel)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeploymentReplicationSpec) DeepCopyInto(out *DeploymentReplicationSpec) {
|
||||
*out = *in
|
||||
in.Source.DeepCopyInto(&out.Source)
|
||||
in.Destination.DeepCopyInto(&out.Destination)
|
||||
in.Cancellation.DeepCopyInto(&out.Cancellation)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -22,6 +22,7 @@ package replication
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
@ -72,11 +73,13 @@ const (
|
|||
deploymentReplicationEventQueueSize = 100
|
||||
minInspectionInterval = time.Second // Ensure we inspect the generated resources no less than with this interval
|
||||
maxInspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval
|
||||
cancellationInterval = time.Second * 5
|
||||
)
|
||||
|
||||
// DeploymentReplication is the in process state of an ArangoDeploymentReplication.
|
||||
type DeploymentReplication struct {
|
||||
log logging.Logger
|
||||
lastLog time.Time
|
||||
apiObject *api.ArangoDeploymentReplication // API object
|
||||
status api.DeploymentReplicationStatus // Internal status of the CR
|
||||
config Config
|
||||
|
@ -246,8 +249,14 @@ func (dr *DeploymentReplication) createEvent(evt *k8sutil.Event) {
|
|||
dr.deps.EventRecorder.Event(evt.InvolvedObject, evt.Type, evt.Reason, evt.Message)
|
||||
}
|
||||
|
||||
// Update the status of the API object from the internal status
|
||||
// Update the status of the API object from the internal status.
|
||||
// Has no effect if object is being deleted.
|
||||
func (dr *DeploymentReplication) updateCRStatus() error {
|
||||
if dr.apiObject.DeletionTimestamp != nil {
|
||||
// Object is being removed so nothing can be changed in the resource.
|
||||
// The field DeploymentReplication.status is updated automatically here.
|
||||
return nil
|
||||
}
|
||||
if reflect.DeepEqual(dr.apiObject.Status, dr.status) {
|
||||
// Nothing has changed
|
||||
return nil
|
||||
|
@ -323,8 +332,13 @@ func (dr *DeploymentReplication) updateCRSpec(newSpec api.DeploymentReplicationS
|
|||
|
||||
// failOnError reports the given error and sets the deployment replication status to failed.
|
||||
func (dr *DeploymentReplication) failOnError(err error, msg string) {
|
||||
dr.log.Err(err).Error(msg)
|
||||
dr.status.Reason = err.Error()
|
||||
if err != nil {
|
||||
dr.log.Err(err).Error(msg)
|
||||
dr.status.Reason = fmt.Sprintf("%s: %s", msg, err.Error())
|
||||
} else {
|
||||
dr.log.Error(msg)
|
||||
dr.status.Reason = msg
|
||||
}
|
||||
dr.reportFailedStatus()
|
||||
}
|
||||
|
||||
|
|
|
@ -22,80 +22,83 @@ package replication
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
"github.com/arangodb/arangosync-client/client"
|
||||
"github.com/arangodb/go-driver"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/constants"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
const (
|
||||
maxCancelFailures = 5 // After this amount of failed cancel-synchronization attempts, the operator switch to abort-sychronization.
|
||||
CancellationTimeout = time.Minute * 15
|
||||
AbortTimeout = time.Minute * 2
|
||||
)
|
||||
|
||||
// addFinalizers adds a stop-sync finalizer to the api object when needed.
|
||||
func (dr *DeploymentReplication) addFinalizers() error {
|
||||
apiObject := dr.apiObject
|
||||
if apiObject.GetDeletionTimestamp() != nil {
|
||||
// addFinalizer adds new finalizer if it does not exist.
|
||||
func (dr *DeploymentReplication) addFinalizer(finalizer string) error {
|
||||
if dr.apiObject.GetDeletionTimestamp() != nil {
|
||||
// Delete already triggered, cannot add.
|
||||
return nil
|
||||
}
|
||||
for _, f := range apiObject.GetFinalizers() {
|
||||
if f == constants.FinalizerDeplReplStopSync {
|
||||
// Finalizer already added
|
||||
return nil
|
||||
apiObject := dr.apiObject
|
||||
|
||||
if !finalizerExists(apiObject, finalizer) {
|
||||
apiObject.SetFinalizers(append(apiObject.GetFinalizers(), finalizer))
|
||||
if err := dr.updateCRSpec(apiObject.Spec); err != nil {
|
||||
return errors.WithMessage(err, "Failed to update CR Spec")
|
||||
}
|
||||
}
|
||||
apiObject.SetFinalizers(append(apiObject.GetFinalizers(), constants.FinalizerDeplReplStopSync))
|
||||
if err := dr.updateCRSpec(apiObject.Spec); err != nil {
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// runFinalizers goes through the list of ArangoDeploymentReplication finalizers to see if they can be removed.
|
||||
func (dr *DeploymentReplication) runFinalizers(ctx context.Context, p *api.ArangoDeploymentReplication) error {
|
||||
log := dr.log.Str("replication-name", p.GetName())
|
||||
var removalList []string
|
||||
for _, f := range p.ObjectMeta.GetFinalizers() {
|
||||
switch f {
|
||||
case constants.FinalizerDeplReplStopSync:
|
||||
log.Debug("Inspecting stop-sync finalizer")
|
||||
if err := dr.inspectFinalizerDeplReplStopSync(ctx, p); err == nil {
|
||||
removalList = append(removalList, f)
|
||||
} else {
|
||||
log.Err(err).Str("finalizer", f).Debug("Cannot remove finalizer yet")
|
||||
}
|
||||
}
|
||||
}
|
||||
// Remove finalizers (if needed)
|
||||
if len(removalList) > 0 {
|
||||
ignoreNotFound := false
|
||||
if err := removeDeploymentReplicationFinalizers(dr.deps.Client.Arango(), p, removalList, ignoreNotFound); err != nil {
|
||||
log.Err(err).Debug("Failed to update deployment replication (to remove finalizers)")
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
// addFinalizers adds a required finalizers to the api object when needed.
|
||||
func (dr *DeploymentReplication) addFinalizers() error {
|
||||
// Add stop sync replication finalizer automatically.
|
||||
return dr.addFinalizer(constants.FinalizerDeplReplStopSync)
|
||||
}
|
||||
|
||||
// inspectFinalizerDeplReplStopSync checks the finalizer condition for stop-sync.
|
||||
// It returns nil if the finalizer can be removed.
|
||||
func (dr *DeploymentReplication) inspectFinalizerDeplReplStopSync(ctx context.Context, p *api.ArangoDeploymentReplication) error {
|
||||
// Inspect phase
|
||||
if p.Status.Phase.IsFailed() {
|
||||
dr.log.Debug("Deployment replication is already failed, safe to remove stop-sync finalizer")
|
||||
return nil
|
||||
// runFinalizers removes stop sync finalizer if it is possible.
|
||||
func (dr *DeploymentReplication) runFinalizers(ctx context.Context, p *api.ArangoDeploymentReplication) (bool, error) {
|
||||
if !finalizerExists(p, constants.FinalizerDeplReplStopSync) {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Inspect deployment deletion state in source
|
||||
abort := dr.status.CancelFailures > maxCancelFailures
|
||||
dr.log.Str("replication-name", p.GetName()).Debug("Inspecting stop-sync finalizer")
|
||||
if retrySoon, err := dr.inspectFinalizerDeplReplStopSync(ctx, p); err != nil {
|
||||
return true, errors.WithMessagef(err, "Cannot remove finalizer \"%s\" yet", constants.FinalizerDeplReplStopSync)
|
||||
} else if retrySoon {
|
||||
// No error, but not finished. Try to reconcile soon.
|
||||
dr.log.Debug("Synchronization is still cancelling")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
removalList := []string{constants.FinalizerDeplReplStopSync}
|
||||
if err := removeDeploymentReplicationFinalizers(dr.deps.Client.Arango(), p, removalList, false); err != nil {
|
||||
return true, errors.WithMessage(err, "Failed to update deployment replication (to remove finalizers)")
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// inspectFinalizerDeplReplStopSync checks cancellation progress.
|
||||
// When true is returned then function can be called after a few seconds to check progress.
|
||||
// When it returns false and nil error then cancellation process is done.
|
||||
func (dr *DeploymentReplication) inspectFinalizerDeplReplStopSync(ctx context.Context,
|
||||
p *api.ArangoDeploymentReplication) (bool, error) {
|
||||
|
||||
abort := isTimeExceeded(p.GetDeletionTimestamp(), CancellationTimeout)
|
||||
// Inspect deployment deletion state in source.
|
||||
depls := dr.deps.Client.Arango().DatabaseV1().ArangoDeployments(p.GetNamespace())
|
||||
if name := p.Spec.Source.GetDeploymentName(); name != "" {
|
||||
depl, err := depls.Get(context.Background(), name, meta.GetOptions{})
|
||||
|
@ -104,7 +107,7 @@ func (dr *DeploymentReplication) inspectFinalizerDeplReplStopSync(ctx context.Co
|
|||
abort = true
|
||||
} else if err != nil {
|
||||
dr.log.Err(err).Warn("Failed to get source deployment")
|
||||
return errors.WithStack(err)
|
||||
return false, errors.WithStack(err)
|
||||
} else if depl.GetDeletionTimestamp() != nil {
|
||||
dr.log.Debug("Source deployment is being deleted. Abort enabled")
|
||||
abort = true
|
||||
|
@ -119,8 +122,8 @@ func (dr *DeploymentReplication) inspectFinalizerDeplReplStopSync(ctx context.Co
|
|||
dr.log.Debug("Destination deployment is gone. Source cleanup enabled")
|
||||
cleanupSource = true
|
||||
} else if err != nil {
|
||||
dr.log.Err(err).Warn("Failed to get destinaton deployment")
|
||||
return errors.WithStack(err)
|
||||
dr.log.Err(err).Warn("Failed to get destination deployment")
|
||||
return false, errors.WithStack(err)
|
||||
} else if depl.GetDeletionTimestamp() != nil {
|
||||
dr.log.Debug("Destination deployment is being deleted. Source cleanup enabled")
|
||||
cleanupSource = true
|
||||
|
@ -136,31 +139,96 @@ func (dr *DeploymentReplication) inspectFinalizerDeplReplStopSync(ctx context.Co
|
|||
return errors.WithStack(err)
|
||||
}*/
|
||||
//sourceClient.Master().C
|
||||
return errors.WithStack(errors.Newf("TODO"))
|
||||
} else {
|
||||
// Destination still exists, stop/abort sync
|
||||
destClient, err := dr.createSyncMasterClient(p.Spec.Destination)
|
||||
if err != nil {
|
||||
dr.log.Err(err).Warn("Failed to create destination client")
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
req := client.CancelSynchronizationRequest{
|
||||
WaitTimeout: time.Minute * 3,
|
||||
Force: abort,
|
||||
ForceTimeout: time.Minute * 2,
|
||||
}
|
||||
dr.log.Bool("abort", abort).Debug("Stopping synchronization...")
|
||||
_, err = destClient.Master().CancelSynchronization(ctx, req)
|
||||
if err != nil && !client.IsPreconditionFailed(err) {
|
||||
dr.log.Err(err).Bool("abort", abort).Warn("Failed to stop synchronization")
|
||||
dr.status.CancelFailures++
|
||||
if err := dr.updateCRStatus(); err != nil {
|
||||
dr.log.Err(err).Warn("Failed to update status to reflect cancel-failures increment")
|
||||
}
|
||||
return errors.WithStack(err)
|
||||
}
|
||||
return nil
|
||||
return false, errors.WithStack(errors.Newf("TODO"))
|
||||
}
|
||||
|
||||
// Destination still exists, stop/abort sync.
|
||||
// Create a client to the destination sync master.
|
||||
destClient, err := dr.createSyncMasterClient(p.Spec.Destination)
|
||||
if err != nil {
|
||||
return false, errors.WithMessage(err, "Failed to create destination synchronization master client")
|
||||
}
|
||||
|
||||
// Get status from sync master.
|
||||
syncInfo, err := destClient.Master().Status(ctx)
|
||||
if err != nil {
|
||||
return false, errors.WithMessage(err, "Failed to get status from target master")
|
||||
}
|
||||
|
||||
// Check progress of a cancellation.
|
||||
if syncStatus, err := dr.getCancellationProgress(syncInfo); err != nil {
|
||||
return false, err
|
||||
} else if syncStatus == client.SyncStatusInactive {
|
||||
return false, nil
|
||||
} else if syncStatus == client.SyncStatusFailed {
|
||||
return false, errors.WithMessagef(err, "unexpected synchronization status \"%s\"", syncStatus)
|
||||
} else if syncStatus == client.SyncStatusCancelling {
|
||||
// Synchronization is cancelling, so request was already sent.
|
||||
if !abort {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
changed := dr.status.Conditions.Update(api.ConditionTypeAborted, abort, "Cancellation type",
|
||||
"Cancellation will wait for source data center to be canceled with a timeout")
|
||||
if !changed {
|
||||
return true, nil
|
||||
}
|
||||
// A Request must be sent once again because abort option has changed.
|
||||
}
|
||||
|
||||
// Check whether data consistency must be ensured.
|
||||
if syncInfo.Status.IsActive() && util.BoolOrDefault(p.Spec.Cancellation.EnsureInSync, true) {
|
||||
if inSync, inSyncShards, totalShards, err := dr.ensureInSync(ctx, destClient); err != nil {
|
||||
return false, err
|
||||
} else if !inSync {
|
||||
if time.Since(dr.lastLog) > time.Second*5 {
|
||||
dr.lastLog = time.Now()
|
||||
dr.log.Info("Consistency is being checked, %d of %d shards are in-sync", inSyncShards, totalShards)
|
||||
}
|
||||
|
||||
// Retry soon.
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// From here on this code should be launched only once unless abort option is changed
|
||||
// or replication is not in cancelling state.
|
||||
sourceServerMode := driver.ServerModeDefault
|
||||
if util.BoolOrDefault(p.Spec.Cancellation.SourceReadOnly) {
|
||||
sourceServerMode = driver.ServerModeReadOnly
|
||||
}
|
||||
req := client.CancelSynchronizationRequest{
|
||||
Force: abort,
|
||||
ForceTimeout: AbortTimeout,
|
||||
SourceServerMode: sourceServerMode,
|
||||
}
|
||||
dr.log.Interface("request", req).Info("Stopping synchronization...")
|
||||
_, errCancel := destClient.Master().CancelSynchronization(ctx, req)
|
||||
if errCancel != nil {
|
||||
dr.status.Reason = fmt.Sprintf("Failed to stop synchronization: %s. Abort: %s", err.Error(), strconv.FormatBool(abort))
|
||||
} else {
|
||||
dr.status.Reason = "Stopping synchronization started"
|
||||
}
|
||||
|
||||
// Update CR status.
|
||||
if err := dr.updateCRStatus(); err != nil {
|
||||
dr.log.Err(err).Warn("Failed to update replication status")
|
||||
// Don't return with this error because original error must be returned.
|
||||
// Not a big deal, because only reason was not saved.
|
||||
// It will be saved on next updateCRStatus call, because reason is kept in status memory
|
||||
}
|
||||
|
||||
// If err is nil then nil will be returned.
|
||||
if errCancel != nil {
|
||||
if abort {
|
||||
return false, errors.WithMessage(errCancel, "Failed to abort synchronization")
|
||||
}
|
||||
|
||||
return false, errors.WithMessage(errCancel, "Failed to stop synchronization")
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
||||
}
|
||||
|
||||
// removeDeploymentReplicationFinalizers removes the given finalizers from the given DeploymentReplication.
|
||||
|
@ -187,3 +255,81 @@ func removeDeploymentReplicationFinalizers(crcli versioned.Interface, p *api.Ara
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// finalizerExists returns true if a given finalizer exists.
|
||||
func finalizerExists(p *api.ArangoDeploymentReplication, finalizer string) bool {
|
||||
for _, f := range p.ObjectMeta.GetFinalizers() {
|
||||
if f == finalizer {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (dr *DeploymentReplication) getCancellationProgress(syncInfo client.SyncInfo) (client.SyncStatus, error) {
|
||||
if syncInfo.IsInactive() {
|
||||
if len(syncInfo.Source) > 0 {
|
||||
return "", errors.New("Inactive target data center is still configured with the endpoint set to a source DC")
|
||||
}
|
||||
return client.SyncStatusInactive, nil
|
||||
}
|
||||
|
||||
if syncInfo.Status == client.SyncStatusInactive {
|
||||
// There are some not finished shards but status is inactive, so it was na canceled.
|
||||
return "", errors.New("Target data center is inactive but some shards are not closed")
|
||||
}
|
||||
|
||||
return syncInfo.Status, nil
|
||||
}
|
||||
|
||||
// ensureInSync checks whether data is consistent on both data centers.
|
||||
// During this check both data centers will be in read-only mode.
|
||||
// Return nil when data is consistent or when consistency was already checked.
|
||||
func (dr *DeploymentReplication) ensureInSync(ctx context.Context, c client.API) (bool, int, int, error) {
|
||||
if dr.status.Conditions.IsTrue(api.ConditionTypeEnsuredInSync) {
|
||||
return true, 0, 0, nil
|
||||
}
|
||||
|
||||
cancelStatus, err := c.Master().GetSynchronizationBarrierStatus(ctx)
|
||||
if err != nil {
|
||||
return false, 0, 0, errors.WithMessage(err, "Can not get synchronization barrier status")
|
||||
}
|
||||
|
||||
if !cancelStatus.SourceServerReadonly ||
|
||||
dr.status.Conditions.Update(api.ConditionTypeEnsuredInSync, false, "Consistent", "Data on both data centers is not the same") {
|
||||
// If `GetSynchronizationBarrierStatus` could return active barrier then it would not create the above condition.
|
||||
if err := c.Master().CreateSynchronizationBarrier(ctx); err != nil {
|
||||
if driver.IsPreconditionFailed(err) {
|
||||
dr.log.Info("Can not create synchronization barrier because synchronization is not running")
|
||||
return false, 0, 0, nil
|
||||
}
|
||||
|
||||
return false, 0, 0, errors.WithMessage(err, "Can not create synchronization barrier")
|
||||
}
|
||||
|
||||
if err := dr.updateCRStatus(); err != nil {
|
||||
return false, 0, 0, errors.WithMessage(err, "Failed to update ArangoDeploymentReplication status")
|
||||
}
|
||||
|
||||
dr.log.Info("Synchronization barrier created, both data centers are in read-only mode")
|
||||
}
|
||||
|
||||
totalShards := cancelStatus.InSyncShards + cancelStatus.NotInSyncShards
|
||||
if cancelStatus.InSyncShards > 0 && cancelStatus.NotInSyncShards == 0 {
|
||||
if dr.status.Conditions.Update(api.ConditionTypeEnsuredInSync, true, "Consistent", "Data on both data centers is the same") {
|
||||
if err := dr.updateCRStatus(); err != nil {
|
||||
return false, 0, 0, errors.WithMessage(err, "Failed to update ArangoDeploymentReplication status")
|
||||
}
|
||||
}
|
||||
|
||||
return true, cancelStatus.InSyncShards, totalShards, nil
|
||||
}
|
||||
|
||||
return false, cancelStatus.InSyncShards, totalShards, nil
|
||||
}
|
||||
|
||||
// isTimeExceeded returns true when a time exceeds a given timeout.
|
||||
func isTimeExceeded(t *meta.Time, timeout time.Duration) bool {
|
||||
return t != nil && time.Since(t.Time) > timeout
|
||||
}
|
||||
|
|
|
@ -54,18 +54,27 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
|
|||
}
|
||||
|
||||
// Is the deployment in failed state, if so, give up.
|
||||
if dr.status.Phase == api.DeploymentReplicationPhaseFailed {
|
||||
if dr.status.Phase.IsFailed() {
|
||||
dr.log.Debug("Deployment replication is in Failed state.")
|
||||
return nextInterval
|
||||
}
|
||||
|
||||
// Is delete triggered?
|
||||
if dr.apiObject.GetDeletionTimestamp() != nil {
|
||||
// Deployment replication is triggered for deletion.
|
||||
if err := dr.runFinalizers(ctx, dr.apiObject); err != nil {
|
||||
dr.log.Err(err).Warn("Failed to run finalizers")
|
||||
hasError = true
|
||||
if timestamp := dr.apiObject.GetDeletionTimestamp(); timestamp != nil {
|
||||
// Resource is being deleted.
|
||||
retrySoon, err := dr.runFinalizers(ctx, dr.apiObject)
|
||||
if err != nil || retrySoon {
|
||||
if err != nil {
|
||||
dr.log.Err(err).Warn("Failed to run finalizers")
|
||||
}
|
||||
timeout := CancellationTimeout + AbortTimeout
|
||||
if isTimeExceeded(timestamp, timeout) {
|
||||
// Cancellation and abort timeout exceeded, so it must go into failed state.
|
||||
dr.failOnError(err, fmt.Sprintf("Failed to cancel synchronization in %s", timeout.String()))
|
||||
}
|
||||
}
|
||||
|
||||
return cancellationInterval
|
||||
} else {
|
||||
// Inspect configuration status
|
||||
destClient, err := dr.createSyncMasterClient(spec.Destination)
|
||||
|
@ -98,15 +107,19 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
|
|||
} else {
|
||||
if isIncomingEndpoint {
|
||||
// Destination is correctly configured
|
||||
dr.status.Conditions.Update(api.ConditionTypeConfigured, true, "Active", "Destination syncmaster is configured correctly and active")
|
||||
|
||||
dr.status.Conditions.Update(api.ConditionTypeConfigured, true, api.ConditionConfiguredReasonActive,
|
||||
"Destination syncmaster is configured correctly and active")
|
||||
dr.status.Destination = createEndpointStatus(destStatus, "")
|
||||
dr.status.IncomingSynchronization = dr.inspectIncomingSynchronizationStatus(ctx, destClient, driver.Version(destArangosyncVersion.Version), destStatus.Shards)
|
||||
dr.status.IncomingSynchronization = dr.inspectIncomingSynchronizationStatus(ctx, destClient,
|
||||
driver.Version(destArangosyncVersion.Version), destStatus.Shards)
|
||||
updateStatusNeeded = true
|
||||
} else {
|
||||
// Sync is active, but from different source
|
||||
dr.log.Warn("Destination syncmaster is configured for different source")
|
||||
cancelSyncNeeded = true
|
||||
if dr.status.Conditions.Update(api.ConditionTypeConfigured, false, "Invalid", "Destination syncmaster is configured for different source") {
|
||||
if dr.status.Conditions.Update(api.ConditionTypeConfigured, false, api.ConditionConfiguredReasonInvalid,
|
||||
"Destination syncmaster is configured for different source") {
|
||||
updateStatusNeeded = true
|
||||
}
|
||||
}
|
||||
|
@ -114,7 +127,8 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
|
|||
} else {
|
||||
// Destination has correct source, but is inactive
|
||||
configureSyncNeeded = true
|
||||
if dr.status.Conditions.Update(api.ConditionTypeConfigured, false, "Inactive", "Destination syncmaster is configured correctly but in-active") {
|
||||
if dr.status.Conditions.Update(api.ConditionTypeConfigured, false, api.ConditionConfiguredReasonInactive,
|
||||
"Destination syncmaster is configured correctly but in-active") {
|
||||
updateStatusNeeded = true
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue