diff --git a/CHANGELOG.md b/CHANGELOG.md index 92f1e01eb..ee6ee6d06 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Change Log ## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A) +- (Feature) Add new field to DeploymentReplicationStatus with details on DC2DC sync status ## [1.2.16](https://github.com/arangodb/kube-arangodb/tree/1.2.16) (2022-09-14) - (Feature) Add ArangoDeployment ServerGroupStatus diff --git a/go.mod b/go.mod index 795fee57d..a7d1285dc 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ replace ( require ( github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680 - github.com/arangodb/arangosync-client v0.7.0 + github.com/arangodb/arangosync-client v0.8.0 github.com/arangodb/go-driver v1.2.1 github.com/arangodb/go-driver/v2 v2.0.0-20211021031401-d92dcd5a4c83 github.com/arangodb/go-upgrade-rules v0.0.0-20180809110947-031b4774ff21 @@ -107,3 +107,5 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect sigs.k8s.io/yaml v1.2.0 ) + +require github.com/cenkalti/backoff/v4 v4.1.3 // indirect diff --git a/go.sum b/go.sum index 51c0c861e..c3d35847b 100644 --- a/go.sum +++ b/go.sum @@ -59,8 +59,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680 h1:5YCGq0gkf/sCEkDFIsMBPj59GOm5cMibGqDBH2OWWfQ= github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680/go.mod h1:xDyzBwyYzcEhsaDXtmxCNM4p5BrtuoVYYsRTuJqmCeg= -github.com/arangodb/arangosync-client v0.7.0 h1:3vLOVnMyr5vGlPA0OHxJL9Wyy49JJwN0uBYU1HDk0qk= -github.com/arangodb/arangosync-client v0.7.0/go.mod h1:g+JcxH3C63wKaJPnPr9nggYoGbt/bYCWpfcRG0NSodY= +github.com/arangodb/arangosync-client v0.8.0 h1:lZzT8ERzkAZWjCZR7HUoTFjAep6KcpNsioWE6t+0ggQ= +github.com/arangodb/arangosync-client v0.8.0/go.mod h1:TwzM8ll85P4Iu+MRq3BTUwsMLDOoaPREKDnwvyRS+70= github.com/arangodb/go-driver v1.2.1 h1:HREDHhDmzdIWxHmfkfTESbYUnRjESjPh4WUuXq7FZa8= github.com/arangodb/go-driver v1.2.1/go.mod h1:zdDkJJnCj8DAkfbtIjIXnsTrWIiy6VhP3Vy14p+uQeY= github.com/arangodb/go-driver/v2 v2.0.0-20211021031401-d92dcd5a4c83 h1:PCbi3alUFastUw6InBKGEXqniveJJcQuMYspubJMRS8= @@ -85,6 +85,8 @@ github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqO github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= +github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= diff --git a/pkg/apis/replication/v1/database_synchronization_status.go b/pkg/apis/replication/v1/database_synchronization_status.go new file mode 100644 index 000000000..fbb193c24 --- /dev/null +++ b/pkg/apis/replication/v1/database_synchronization_status.go @@ -0,0 +1,38 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +// DatabaseSynchronizationStatus contains the synchronization status of replication for database +type DatabaseSynchronizationStatus struct { + // ShardsTotal shows how many shards are expected to be in-sync + ShardsTotal int `json:"shards-total"` + // ShardsInSync shows how many shards are already in-sync + ShardsInSync int `json:"shards-in-sync"` + // Errors contains a list of errors if there were unexpected errors during synchronization + Errors []DatabaseSynchronizationError `json:"errors,omitempty"` +} + +// DatabaseSynchronizationError contains the error message for specific shard in collection +type DatabaseSynchronizationError struct { + Collection string `json:"collection,omitempty"` + Shard string `json:"shard,omitempty"` + Message string `json:"message"` +} diff --git a/pkg/apis/replication/v1/replication_status.go b/pkg/apis/replication/v1/replication_status.go index 9311ea162..2bfdcec72 100644 --- a/pkg/apis/replication/v1/replication_status.go +++ b/pkg/apis/replication/v1/replication_status.go @@ -25,7 +25,7 @@ package v1 type DeploymentReplicationStatus struct { // Phase holds the current lifetime phase of the deployment replication Phase DeploymentReplicationPhase `json:"phase,omitempty"` - // Reason contains a human readable reason for reaching the current phase (can be empty) + // Reason contains a human-readable reason for reaching the current phase (can be empty) Reason string `json:"reason,omitempty"` // Reason for current phase // Conditions specific to the entire deployment replication @@ -39,4 +39,7 @@ type DeploymentReplicationStatus struct { // CancelFailures records the number of times that the configuration was canceled // which resulted in an error. CancelFailures int `json:"cancel-failures,omitempty"` + + // IncomingSynchronization contains the incoming synchronization status for all databases + IncomingSynchronization SynchronizationStatus `json:"incoming-synchronization,omitempty"` } diff --git a/pkg/apis/replication/v1/synchronization_status.go b/pkg/apis/replication/v1/synchronization_status.go new file mode 100644 index 000000000..36368d13f --- /dev/null +++ b/pkg/apis/replication/v1/synchronization_status.go @@ -0,0 +1,30 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +// SynchronizationStatus contains the synchronization status of replication for all databases +type SynchronizationStatus struct { + // Databases holds the synchronization status for each database. + Databases map[string]DatabaseSynchronizationStatus `json:"databases,omitempty"` + // AllInSync is true if all shards listed in 'databases' are in sync + AllInSync bool `json:"allInSync,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/pkg/apis/replication/v1/zz_generated.deepcopy.go b/pkg/apis/replication/v1/zz_generated.deepcopy.go index 3bbd658e6..34a464564 100644 --- a/pkg/apis/replication/v1/zz_generated.deepcopy.go +++ b/pkg/apis/replication/v1/zz_generated.deepcopy.go @@ -174,6 +174,43 @@ func (in *DatabaseStatus) DeepCopy() *DatabaseStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DatabaseSynchronizationError) DeepCopyInto(out *DatabaseSynchronizationError) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationError. +func (in *DatabaseSynchronizationError) DeepCopy() *DatabaseSynchronizationError { + if in == nil { + return nil + } + out := new(DatabaseSynchronizationError) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DatabaseSynchronizationStatus) DeepCopyInto(out *DatabaseSynchronizationStatus) { + *out = *in + if in.Errors != nil { + in, out := &in.Errors, &out.Errors + *out = make([]DatabaseSynchronizationError, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationStatus. +func (in *DatabaseSynchronizationStatus) DeepCopy() *DatabaseSynchronizationStatus { + if in == nil { + return nil + } + out := new(DatabaseSynchronizationStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentReplicationSpec) DeepCopyInto(out *DeploymentReplicationSpec) { *out = *in @@ -204,6 +241,7 @@ func (in *DeploymentReplicationStatus) DeepCopyInto(out *DeploymentReplicationSt } in.Source.DeepCopyInto(&out.Source) in.Destination.DeepCopyInto(&out.Destination) + in.IncomingSynchronization.DeepCopyInto(&out.IncomingSynchronization) return } @@ -330,3 +368,26 @@ func (in *ShardStatus) DeepCopy() *ShardStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SynchronizationStatus) DeepCopyInto(out *SynchronizationStatus) { + *out = *in + if in.Databases != nil { + in, out := &in.Databases, &out.Databases + *out = make(map[string]DatabaseSynchronizationStatus, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SynchronizationStatus. +func (in *SynchronizationStatus) DeepCopy() *SynchronizationStatus { + if in == nil { + return nil + } + out := new(SynchronizationStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/replication/v2alpha1/database_synchronization_status.go b/pkg/apis/replication/v2alpha1/database_synchronization_status.go new file mode 100644 index 000000000..c06363cf0 --- /dev/null +++ b/pkg/apis/replication/v2alpha1/database_synchronization_status.go @@ -0,0 +1,38 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v2alpha1 + +// DatabaseSynchronizationStatus contains the synchronization status of replication for database +type DatabaseSynchronizationStatus struct { + // ShardsTotal shows how many shards are expected to be in-sync + ShardsTotal int `json:"shards-total"` + // ShardsInSync shows how many shards are already in-sync + ShardsInSync int `json:"shards-in-sync"` + // Errors contains a list of errors if there were unexpected errors during synchronization + Errors []DatabaseSynchronizationError `json:"errors,omitempty"` +} + +// DatabaseSynchronizationError contains the error message for specific shard in collection +type DatabaseSynchronizationError struct { + Collection string `json:"collection,omitempty"` + Shard string `json:"shard,omitempty"` + Message string `json:"message"` +} diff --git a/pkg/apis/replication/v2alpha1/replication_status.go b/pkg/apis/replication/v2alpha1/replication_status.go index 8377722d9..f7ef77282 100644 --- a/pkg/apis/replication/v2alpha1/replication_status.go +++ b/pkg/apis/replication/v2alpha1/replication_status.go @@ -39,4 +39,7 @@ type DeploymentReplicationStatus struct { // CancelFailures records the number of times that the configuration was canceled // which resulted in an error. CancelFailures int `json:"cancel-failures,omitempty"` + + // IncomingSynchronization contains the incoming synchronization status for all databases + IncomingSynchronization SynchronizationStatus `json:"incoming-synchronization,omitempty"` } diff --git a/pkg/apis/replication/v2alpha1/synchronization_status.go b/pkg/apis/replication/v2alpha1/synchronization_status.go new file mode 100644 index 000000000..45266a075 --- /dev/null +++ b/pkg/apis/replication/v2alpha1/synchronization_status.go @@ -0,0 +1,30 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v2alpha1 + +// SynchronizationStatus contains the synchronization status of replication for all databases +type SynchronizationStatus struct { + // Databases holds the synchronization status for each database. + Databases map[string]DatabaseSynchronizationStatus `json:"databases,omitempty"` + // AllInSync is true if all shards listed in 'databases' are in sync + AllInSync bool `json:"allInSync,omitempty"` + Error string `json:"error,omitempty"` +} diff --git a/pkg/apis/replication/v2alpha1/zz_generated.deepcopy.go b/pkg/apis/replication/v2alpha1/zz_generated.deepcopy.go index ec47b7d2e..f78f51093 100644 --- a/pkg/apis/replication/v2alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/replication/v2alpha1/zz_generated.deepcopy.go @@ -174,6 +174,43 @@ func (in *DatabaseStatus) DeepCopy() *DatabaseStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DatabaseSynchronizationError) DeepCopyInto(out *DatabaseSynchronizationError) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationError. +func (in *DatabaseSynchronizationError) DeepCopy() *DatabaseSynchronizationError { + if in == nil { + return nil + } + out := new(DatabaseSynchronizationError) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *DatabaseSynchronizationStatus) DeepCopyInto(out *DatabaseSynchronizationStatus) { + *out = *in + if in.Errors != nil { + in, out := &in.Errors, &out.Errors + *out = make([]DatabaseSynchronizationError, len(*in)) + copy(*out, *in) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationStatus. +func (in *DatabaseSynchronizationStatus) DeepCopy() *DatabaseSynchronizationStatus { + if in == nil { + return nil + } + out := new(DatabaseSynchronizationStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *DeploymentReplicationSpec) DeepCopyInto(out *DeploymentReplicationSpec) { *out = *in @@ -204,6 +241,7 @@ func (in *DeploymentReplicationStatus) DeepCopyInto(out *DeploymentReplicationSt } in.Source.DeepCopyInto(&out.Source) in.Destination.DeepCopyInto(&out.Destination) + in.IncomingSynchronization.DeepCopyInto(&out.IncomingSynchronization) return } @@ -330,3 +368,26 @@ func (in *ShardStatus) DeepCopy() *ShardStatus { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SynchronizationStatus) DeepCopyInto(out *SynchronizationStatus) { + *out = *in + if in.Databases != nil { + in, out := &in.Databases, &out.Databases + *out = make(map[string]DatabaseSynchronizationStatus, len(*in)) + for key, val := range *in { + (*out)[key] = *val.DeepCopy() + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SynchronizationStatus. +func (in *SynchronizationStatus) DeepCopy() *SynchronizationStatus { + if in == nil { + return nil + } + out := new(SynchronizationStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index ba0db75c9..7b1a0daea 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -28,14 +28,12 @@ import ( "strconv" "time" - "github.com/rs/zerolog/log" core "k8s.io/api/core/v1" apiErrors "k8s.io/apimachinery/pkg/api/errors" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "github.com/arangodb/arangosync-client/client" - "github.com/arangodb/arangosync-client/tasks" driver "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" "github.com/arangodb/go-driver/http" @@ -52,6 +50,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "github.com/arangodb/kube-arangodb/pkg/operator/scope" + "github.com/arangodb/kube-arangodb/pkg/replication" "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" "github.com/arangodb/kube-arangodb/pkg/util/constants" "github.com/arangodb/kube-arangodb/pkg/util/errors" @@ -324,17 +323,10 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr port = shared.ArangoSyncWorkerPort } source := client.Endpoint{"https://" + net.JoinHostPort(dnsName, strconv.Itoa(port))} - tlsAuth := tasks.TLSAuthentication{ - TLSClientAuthentication: tasks.TLSClientAuthentication{ - ClientToken: monitoringToken, - }, - } - auth := client.NewAuthentication(tlsAuth, "") - insecureSkipVerify := true - // TODO: Change logging system in sync client - c, err := d.syncClientCache.GetClient(log.Logger, source, auth, insecureSkipVerify) + + c, err := replication.GetSyncServerClient(&d.syncClientCache, monitoringToken, source) if err != nil { - return nil, errors.WithStack(err) + return nil, err } return c, nil } diff --git a/pkg/operator/operator_deployment_relication.go b/pkg/operator/operator_deployment_replication.go similarity index 100% rename from pkg/operator/operator_deployment_relication.go rename to pkg/operator/operator_deployment_replication.go diff --git a/pkg/replication/client.go b/pkg/replication/client.go new file mode 100644 index 000000000..00fe89b4c --- /dev/null +++ b/pkg/replication/client.go @@ -0,0 +1,43 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package replication + +import ( + "github.com/arangodb/arangosync-client/client" + "github.com/arangodb/arangosync-client/tasks" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" +) + +func GetSyncServerClient(clientCache *client.ClientCache, token string, source client.Endpoint) (client.API, error) { + tlsAuth := tasks.TLSAuthentication{ + TLSClientAuthentication: tasks.TLSClientAuthentication{ + ClientToken: token, + }, + } + auth := client.NewAuthentication(tlsAuth, "") + insecureSkipVerify := true + c, err := clientCache.GetClient(client.NewExternalEndpoints(source), auth, insecureSkipVerify) + if err != nil { + return nil, errors.WithStack(err) + } + return c, nil +} diff --git a/pkg/replication/sync_client.go b/pkg/replication/sync_client.go index 211536cb7..37de9b7e9 100644 --- a/pkg/replication/sync_client.go +++ b/pkg/replication/sync_client.go @@ -25,7 +25,6 @@ import ( "net" "strconv" - "github.com/rs/zerolog/log" meta "k8s.io/apimachinery/pkg/apis/meta/v1" certificates "github.com/arangodb-helper/go-certificates" @@ -95,8 +94,7 @@ func (dr *DeploymentReplication) createSyncMasterClient(epSpec api.EndpointSpec) auth.Password = password // Create client - // TODO: Change logger in clientset - c, err := dr.clientCache.GetClient(log.Logger, source, auth, insecureSkipVerify) + c, err := dr.clientCache.GetClient(client.NewExternalEndpoints(source), auth, insecureSkipVerify) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/replication/sync_inspector.go b/pkg/replication/sync_inspector.go index a140ef202..4c636d16e 100644 --- a/pkg/replication/sync_inspector.go +++ b/pkg/replication/sync_inspector.go @@ -22,12 +22,16 @@ package replication import ( "context" + "fmt" "sort" "time" "github.com/arangodb/arangosync-client/client" + "github.com/arangodb/arangosync-client/client/synccheck" + "github.com/arangodb/go-driver" api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1" + "github.com/arangodb/kube-arangodb/pkg/deployment/features" "github.com/arangodb/kube-arangodb/pkg/util/errors" ) @@ -68,6 +72,12 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time. if err != nil { dr.log.Err(err).Warn("Failed to create destination syncmaster client") } else { + destArangosyncVersion, err := destClient.Version(ctx) + if err != nil { + dr.log.Err(err).Warn("Failed to get destination arangosync version") + hasError = true + } + // Fetch status of destination updateStatusNeeded := false configureSyncNeeded := false @@ -89,8 +99,8 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time. if isIncomingEndpoint { // Destination is correctly configured dr.status.Conditions.Update(api.ConditionTypeConfigured, true, "Active", "Destination syncmaster is configured correctly and active") - // Fetch shard status dr.status.Destination = createEndpointStatus(destStatus, "") + dr.status.IncomingSynchronization = dr.inspectIncomingSynchronizationStatus(ctx, destClient, driver.Version(destArangosyncVersion.Version), destStatus.Shards) updateStatusNeeded = true } else { // Sync is active, but from different source @@ -228,6 +238,96 @@ func (dr *DeploymentReplication) hasOutgoingEndpoint(status client.SyncInfo, epS return "", false, nil } +// inspectIncomingSynchronizationStatus returns the synchronization status for the incoming sync +func (dr *DeploymentReplication) inspectIncomingSynchronizationStatus(ctx context.Context, syncClient client.API, arangosyncVersion driver.Version, localShards []client.ShardSyncInfo) api.SynchronizationStatus { + dataCentersResp, err := syncClient.Master().GetDataCentersInfo(ctx) + if err != nil { + errMsg := "Failed to fetch data-centers info" + dr.log.Err(err).Warn(errMsg) + return api.SynchronizationStatus{ + Error: fmt.Sprintf("%s: %s", errMsg, err.Error()), + } + } + + ch := synccheck.NewSynchronizationChecker(syncClient, time.Minute) + incomingSyncStatus, err := ch.CheckSync(ctx, &dataCentersResp, localShards) + if err != nil { + errMsg := "Failed to check synchronization status" + dr.log.Err(err).Warn(errMsg) + return api.SynchronizationStatus{ + Error: fmt.Sprintf("%s: %s", errMsg, err.Error()), + } + } + return dr.createSynchronizationStatus(arangosyncVersion, incomingSyncStatus) +} + +// createSynchronizationStatus returns aggregated info about DCSyncStatus +func (dr *DeploymentReplication) createSynchronizationStatus(arangosyncVersion driver.Version, dcSyncStatus *synccheck.DCSyncStatus) api.SynchronizationStatus { + dbs := make(map[string]api.DatabaseSynchronizationStatus, len(dcSyncStatus.Databases)) + i := 0 + for dbName, dbSyncStatus := range dcSyncStatus.Databases { + i++ + db := dbName + if features.SensitiveInformationProtection().Enabled() { + // internal IDs are not available in older versions + if arangosyncVersion.CompareTo("2.12.0") >= 0 { + db = dbSyncStatus.ID + } else { + db = fmt.Sprintf("", i) + } + } + dbs[db] = dr.createDatabaseSynchronizationStatus(dbSyncStatus) + } + return api.SynchronizationStatus{ + AllInSync: dcSyncStatus.AllInSync(), + Databases: dbs, + Error: "", + } +} + +// createDatabaseSynchronizationStatus returns sync status for DB +func (dr *DeploymentReplication) createDatabaseSynchronizationStatus(dbSyncStatus synccheck.DatabaseSyncStatus) api.DatabaseSynchronizationStatus { + // use limit for errors because the resulting status object should not be too big + const maxReportedIncomingSyncErrors = 20 + + var errs []api.DatabaseSynchronizationError + var shardsTotal, shardsInSync int + var errorsReportedToLog = 0 + for colName, colSyncStatus := range dbSyncStatus.Collections { + if colSyncStatus.Error != "" && len(errs) < maxReportedIncomingSyncErrors { + col := colName + if features.SensitiveInformationProtection().Enabled() { + col = colSyncStatus.ID + } + + errs = append(errs, api.DatabaseSynchronizationError{ + Collection: col, + Shard: "", + Message: colSyncStatus.Error, + }) + } + + shardsTotal += len(colSyncStatus.Shards) + for shardIndex, shardSyncStatus := range colSyncStatus.Shards { + if shardSyncStatus.InSync { + shardsInSync++ + } else if errorsReportedToLog < maxReportedIncomingSyncErrors { + dr.log.Str("db", dbSyncStatus.ID). + Str("col", colSyncStatus.ID). + Int("shard", shardIndex). + Debug("incoming synchronization shard status is not in-sync: %s", shardSyncStatus.Message) + errorsReportedToLog++ + } + } + } + + return api.DatabaseSynchronizationStatus{ + ShardsTotal: shardsTotal, + ShardsInSync: shardsInSync, + Errors: errs, + } +} + // createEndpointStatus creates an api EndpointStatus from the given sync status. func createEndpointStatus(status client.SyncInfo, outgoingID string) api.EndpointStatus { result := api.EndpointStatus{}