mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Feature] Add new field to DeploymentReplicationStatus with details on DC2DC sync status (#1096)
This commit is contained in:
parent
edb5092616
commit
18ad9b6919
16 changed files with 422 additions and 20 deletions
|
@ -1,6 +1,7 @@
|
|||
# Change Log
|
||||
|
||||
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
|
||||
- (Feature) Add new field to DeploymentReplicationStatus with details on DC2DC sync status
|
||||
|
||||
## [1.2.16](https://github.com/arangodb/kube-arangodb/tree/1.2.16) (2022-09-14)
|
||||
- (Feature) Add ArangoDeployment ServerGroupStatus
|
||||
|
|
4
go.mod
4
go.mod
|
@ -24,7 +24,7 @@ replace (
|
|||
|
||||
require (
|
||||
github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680
|
||||
github.com/arangodb/arangosync-client v0.7.0
|
||||
github.com/arangodb/arangosync-client v0.8.0
|
||||
github.com/arangodb/go-driver v1.2.1
|
||||
github.com/arangodb/go-driver/v2 v2.0.0-20211021031401-d92dcd5a4c83
|
||||
github.com/arangodb/go-upgrade-rules v0.0.0-20180809110947-031b4774ff21
|
||||
|
@ -107,3 +107,5 @@ require (
|
|||
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
|
||||
sigs.k8s.io/yaml v1.2.0
|
||||
)
|
||||
|
||||
require github.com/cenkalti/backoff/v4 v4.1.3 // indirect
|
||||
|
|
6
go.sum
6
go.sum
|
@ -59,8 +59,8 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF
|
|||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
|
||||
github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680 h1:5YCGq0gkf/sCEkDFIsMBPj59GOm5cMibGqDBH2OWWfQ=
|
||||
github.com/arangodb-helper/go-certificates v0.0.0-20180821055445-9fca24fc2680/go.mod h1:xDyzBwyYzcEhsaDXtmxCNM4p5BrtuoVYYsRTuJqmCeg=
|
||||
github.com/arangodb/arangosync-client v0.7.0 h1:3vLOVnMyr5vGlPA0OHxJL9Wyy49JJwN0uBYU1HDk0qk=
|
||||
github.com/arangodb/arangosync-client v0.7.0/go.mod h1:g+JcxH3C63wKaJPnPr9nggYoGbt/bYCWpfcRG0NSodY=
|
||||
github.com/arangodb/arangosync-client v0.8.0 h1:lZzT8ERzkAZWjCZR7HUoTFjAep6KcpNsioWE6t+0ggQ=
|
||||
github.com/arangodb/arangosync-client v0.8.0/go.mod h1:TwzM8ll85P4Iu+MRq3BTUwsMLDOoaPREKDnwvyRS+70=
|
||||
github.com/arangodb/go-driver v1.2.1 h1:HREDHhDmzdIWxHmfkfTESbYUnRjESjPh4WUuXq7FZa8=
|
||||
github.com/arangodb/go-driver v1.2.1/go.mod h1:zdDkJJnCj8DAkfbtIjIXnsTrWIiy6VhP3Vy14p+uQeY=
|
||||
github.com/arangodb/go-driver/v2 v2.0.0-20211021031401-d92dcd5a4c83 h1:PCbi3alUFastUw6InBKGEXqniveJJcQuMYspubJMRS8=
|
||||
|
@ -85,6 +85,8 @@ github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqO
|
|||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4=
|
||||
github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
|
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
|
||||
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
|
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
|
||||
|
|
38
pkg/apis/replication/v1/database_synchronization_status.go
Normal file
38
pkg/apis/replication/v1/database_synchronization_status.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package v1
|
||||
|
||||
// DatabaseSynchronizationStatus contains the synchronization status of replication for database
|
||||
type DatabaseSynchronizationStatus struct {
|
||||
// ShardsTotal shows how many shards are expected to be in-sync
|
||||
ShardsTotal int `json:"shards-total"`
|
||||
// ShardsInSync shows how many shards are already in-sync
|
||||
ShardsInSync int `json:"shards-in-sync"`
|
||||
// Errors contains a list of errors if there were unexpected errors during synchronization
|
||||
Errors []DatabaseSynchronizationError `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
// DatabaseSynchronizationError contains the error message for specific shard in collection
|
||||
type DatabaseSynchronizationError struct {
|
||||
Collection string `json:"collection,omitempty"`
|
||||
Shard string `json:"shard,omitempty"`
|
||||
Message string `json:"message"`
|
||||
}
|
|
@ -25,7 +25,7 @@ package v1
|
|||
type DeploymentReplicationStatus struct {
|
||||
// Phase holds the current lifetime phase of the deployment replication
|
||||
Phase DeploymentReplicationPhase `json:"phase,omitempty"`
|
||||
// Reason contains a human readable reason for reaching the current phase (can be empty)
|
||||
// Reason contains a human-readable reason for reaching the current phase (can be empty)
|
||||
Reason string `json:"reason,omitempty"` // Reason for current phase
|
||||
|
||||
// Conditions specific to the entire deployment replication
|
||||
|
@ -39,4 +39,7 @@ type DeploymentReplicationStatus struct {
|
|||
// CancelFailures records the number of times that the configuration was canceled
|
||||
// which resulted in an error.
|
||||
CancelFailures int `json:"cancel-failures,omitempty"`
|
||||
|
||||
// IncomingSynchronization contains the incoming synchronization status for all databases
|
||||
IncomingSynchronization SynchronizationStatus `json:"incoming-synchronization,omitempty"`
|
||||
}
|
||||
|
|
30
pkg/apis/replication/v1/synchronization_status.go
Normal file
30
pkg/apis/replication/v1/synchronization_status.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package v1
|
||||
|
||||
// SynchronizationStatus contains the synchronization status of replication for all databases
|
||||
type SynchronizationStatus struct {
|
||||
// Databases holds the synchronization status for each database.
|
||||
Databases map[string]DatabaseSynchronizationStatus `json:"databases,omitempty"`
|
||||
// AllInSync is true if all shards listed in 'databases' are in sync
|
||||
AllInSync bool `json:"allInSync,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
61
pkg/apis/replication/v1/zz_generated.deepcopy.go
generated
61
pkg/apis/replication/v1/zz_generated.deepcopy.go
generated
|
@ -174,6 +174,43 @@ func (in *DatabaseStatus) DeepCopy() *DatabaseStatus {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DatabaseSynchronizationError) DeepCopyInto(out *DatabaseSynchronizationError) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationError.
|
||||
func (in *DatabaseSynchronizationError) DeepCopy() *DatabaseSynchronizationError {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DatabaseSynchronizationError)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DatabaseSynchronizationStatus) DeepCopyInto(out *DatabaseSynchronizationStatus) {
|
||||
*out = *in
|
||||
if in.Errors != nil {
|
||||
in, out := &in.Errors, &out.Errors
|
||||
*out = make([]DatabaseSynchronizationError, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationStatus.
|
||||
func (in *DatabaseSynchronizationStatus) DeepCopy() *DatabaseSynchronizationStatus {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DatabaseSynchronizationStatus)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeploymentReplicationSpec) DeepCopyInto(out *DeploymentReplicationSpec) {
|
||||
*out = *in
|
||||
|
@ -204,6 +241,7 @@ func (in *DeploymentReplicationStatus) DeepCopyInto(out *DeploymentReplicationSt
|
|||
}
|
||||
in.Source.DeepCopyInto(&out.Source)
|
||||
in.Destination.DeepCopyInto(&out.Destination)
|
||||
in.IncomingSynchronization.DeepCopyInto(&out.IncomingSynchronization)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -330,3 +368,26 @@ func (in *ShardStatus) DeepCopy() *ShardStatus {
|
|||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *SynchronizationStatus) DeepCopyInto(out *SynchronizationStatus) {
|
||||
*out = *in
|
||||
if in.Databases != nil {
|
||||
in, out := &in.Databases, &out.Databases
|
||||
*out = make(map[string]DatabaseSynchronizationStatus, len(*in))
|
||||
for key, val := range *in {
|
||||
(*out)[key] = *val.DeepCopy()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SynchronizationStatus.
|
||||
func (in *SynchronizationStatus) DeepCopy() *SynchronizationStatus {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(SynchronizationStatus)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
|
|
@ -0,0 +1,38 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package v2alpha1
|
||||
|
||||
// DatabaseSynchronizationStatus contains the synchronization status of replication for database
|
||||
type DatabaseSynchronizationStatus struct {
|
||||
// ShardsTotal shows how many shards are expected to be in-sync
|
||||
ShardsTotal int `json:"shards-total"`
|
||||
// ShardsInSync shows how many shards are already in-sync
|
||||
ShardsInSync int `json:"shards-in-sync"`
|
||||
// Errors contains a list of errors if there were unexpected errors during synchronization
|
||||
Errors []DatabaseSynchronizationError `json:"errors,omitempty"`
|
||||
}
|
||||
|
||||
// DatabaseSynchronizationError contains the error message for specific shard in collection
|
||||
type DatabaseSynchronizationError struct {
|
||||
Collection string `json:"collection,omitempty"`
|
||||
Shard string `json:"shard,omitempty"`
|
||||
Message string `json:"message"`
|
||||
}
|
|
@ -39,4 +39,7 @@ type DeploymentReplicationStatus struct {
|
|||
// CancelFailures records the number of times that the configuration was canceled
|
||||
// which resulted in an error.
|
||||
CancelFailures int `json:"cancel-failures,omitempty"`
|
||||
|
||||
// IncomingSynchronization contains the incoming synchronization status for all databases
|
||||
IncomingSynchronization SynchronizationStatus `json:"incoming-synchronization,omitempty"`
|
||||
}
|
||||
|
|
30
pkg/apis/replication/v2alpha1/synchronization_status.go
Normal file
30
pkg/apis/replication/v2alpha1/synchronization_status.go
Normal file
|
@ -0,0 +1,30 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package v2alpha1
|
||||
|
||||
// SynchronizationStatus contains the synchronization status of replication for all databases
|
||||
type SynchronizationStatus struct {
|
||||
// Databases holds the synchronization status for each database.
|
||||
Databases map[string]DatabaseSynchronizationStatus `json:"databases,omitempty"`
|
||||
// AllInSync is true if all shards listed in 'databases' are in sync
|
||||
AllInSync bool `json:"allInSync,omitempty"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
|
@ -174,6 +174,43 @@ func (in *DatabaseStatus) DeepCopy() *DatabaseStatus {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DatabaseSynchronizationError) DeepCopyInto(out *DatabaseSynchronizationError) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationError.
|
||||
func (in *DatabaseSynchronizationError) DeepCopy() *DatabaseSynchronizationError {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DatabaseSynchronizationError)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DatabaseSynchronizationStatus) DeepCopyInto(out *DatabaseSynchronizationStatus) {
|
||||
*out = *in
|
||||
if in.Errors != nil {
|
||||
in, out := &in.Errors, &out.Errors
|
||||
*out = make([]DatabaseSynchronizationError, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DatabaseSynchronizationStatus.
|
||||
func (in *DatabaseSynchronizationStatus) DeepCopy() *DatabaseSynchronizationStatus {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(DatabaseSynchronizationStatus)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *DeploymentReplicationSpec) DeepCopyInto(out *DeploymentReplicationSpec) {
|
||||
*out = *in
|
||||
|
@ -204,6 +241,7 @@ func (in *DeploymentReplicationStatus) DeepCopyInto(out *DeploymentReplicationSt
|
|||
}
|
||||
in.Source.DeepCopyInto(&out.Source)
|
||||
in.Destination.DeepCopyInto(&out.Destination)
|
||||
in.IncomingSynchronization.DeepCopyInto(&out.IncomingSynchronization)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -330,3 +368,26 @@ func (in *ShardStatus) DeepCopy() *ShardStatus {
|
|||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *SynchronizationStatus) DeepCopyInto(out *SynchronizationStatus) {
|
||||
*out = *in
|
||||
if in.Databases != nil {
|
||||
in, out := &in.Databases, &out.Databases
|
||||
*out = make(map[string]DatabaseSynchronizationStatus, len(*in))
|
||||
for key, val := range *in {
|
||||
(*out)[key] = *val.DeepCopy()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SynchronizationStatus.
|
||||
func (in *SynchronizationStatus) DeepCopy() *SynchronizationStatus {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(SynchronizationStatus)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
|
|
@ -28,14 +28,12 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
core "k8s.io/api/core/v1"
|
||||
apiErrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
"github.com/arangodb/arangosync-client/client"
|
||||
"github.com/arangodb/arangosync-client/tasks"
|
||||
driver "github.com/arangodb/go-driver"
|
||||
"github.com/arangodb/go-driver/agency"
|
||||
"github.com/arangodb/go-driver/http"
|
||||
|
@ -52,6 +50,7 @@ import (
|
|||
"github.com/arangodb/kube-arangodb/pkg/deployment/reconciler"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
|
||||
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
|
||||
"github.com/arangodb/kube-arangodb/pkg/replication"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/constants"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
|
@ -324,17 +323,10 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
|
|||
port = shared.ArangoSyncWorkerPort
|
||||
}
|
||||
source := client.Endpoint{"https://" + net.JoinHostPort(dnsName, strconv.Itoa(port))}
|
||||
tlsAuth := tasks.TLSAuthentication{
|
||||
TLSClientAuthentication: tasks.TLSClientAuthentication{
|
||||
ClientToken: monitoringToken,
|
||||
},
|
||||
}
|
||||
auth := client.NewAuthentication(tlsAuth, "")
|
||||
insecureSkipVerify := true
|
||||
// TODO: Change logging system in sync client
|
||||
c, err := d.syncClientCache.GetClient(log.Logger, source, auth, insecureSkipVerify)
|
||||
|
||||
c, err := replication.GetSyncServerClient(&d.syncClientCache, monitoringToken, source)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
return nil, err
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
|
43
pkg/replication/client.go
Normal file
43
pkg/replication/client.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package replication
|
||||
|
||||
import (
|
||||
"github.com/arangodb/arangosync-client/client"
|
||||
"github.com/arangodb/arangosync-client/tasks"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
)
|
||||
|
||||
func GetSyncServerClient(clientCache *client.ClientCache, token string, source client.Endpoint) (client.API, error) {
|
||||
tlsAuth := tasks.TLSAuthentication{
|
||||
TLSClientAuthentication: tasks.TLSClientAuthentication{
|
||||
ClientToken: token,
|
||||
},
|
||||
}
|
||||
auth := client.NewAuthentication(tlsAuth, "")
|
||||
insecureSkipVerify := true
|
||||
c, err := clientCache.GetClient(client.NewExternalEndpoints(source), auth, insecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
|
@ -25,7 +25,6 @@ import (
|
|||
"net"
|
||||
"strconv"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
certificates "github.com/arangodb-helper/go-certificates"
|
||||
|
@ -95,8 +94,7 @@ func (dr *DeploymentReplication) createSyncMasterClient(epSpec api.EndpointSpec)
|
|||
auth.Password = password
|
||||
|
||||
// Create client
|
||||
// TODO: Change logger in clientset
|
||||
c, err := dr.clientCache.GetClient(log.Logger, source, auth, insecureSkipVerify)
|
||||
c, err := dr.clientCache.GetClient(client.NewExternalEndpoints(source), auth, insecureSkipVerify)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
|
|
@ -22,12 +22,16 @@ package replication
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/arangodb/arangosync-client/client"
|
||||
"github.com/arangodb/arangosync-client/client/synccheck"
|
||||
"github.com/arangodb/go-driver"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
)
|
||||
|
||||
|
@ -68,6 +72,12 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
|
|||
if err != nil {
|
||||
dr.log.Err(err).Warn("Failed to create destination syncmaster client")
|
||||
} else {
|
||||
destArangosyncVersion, err := destClient.Version(ctx)
|
||||
if err != nil {
|
||||
dr.log.Err(err).Warn("Failed to get destination arangosync version")
|
||||
hasError = true
|
||||
}
|
||||
|
||||
// Fetch status of destination
|
||||
updateStatusNeeded := false
|
||||
configureSyncNeeded := false
|
||||
|
@ -89,8 +99,8 @@ func (dr *DeploymentReplication) inspectDeploymentReplication(lastInterval time.
|
|||
if isIncomingEndpoint {
|
||||
// Destination is correctly configured
|
||||
dr.status.Conditions.Update(api.ConditionTypeConfigured, true, "Active", "Destination syncmaster is configured correctly and active")
|
||||
// Fetch shard status
|
||||
dr.status.Destination = createEndpointStatus(destStatus, "")
|
||||
dr.status.IncomingSynchronization = dr.inspectIncomingSynchronizationStatus(ctx, destClient, driver.Version(destArangosyncVersion.Version), destStatus.Shards)
|
||||
updateStatusNeeded = true
|
||||
} else {
|
||||
// Sync is active, but from different source
|
||||
|
@ -228,6 +238,96 @@ func (dr *DeploymentReplication) hasOutgoingEndpoint(status client.SyncInfo, epS
|
|||
return "", false, nil
|
||||
}
|
||||
|
||||
// inspectIncomingSynchronizationStatus returns the synchronization status for the incoming sync
|
||||
func (dr *DeploymentReplication) inspectIncomingSynchronizationStatus(ctx context.Context, syncClient client.API, arangosyncVersion driver.Version, localShards []client.ShardSyncInfo) api.SynchronizationStatus {
|
||||
dataCentersResp, err := syncClient.Master().GetDataCentersInfo(ctx)
|
||||
if err != nil {
|
||||
errMsg := "Failed to fetch data-centers info"
|
||||
dr.log.Err(err).Warn(errMsg)
|
||||
return api.SynchronizationStatus{
|
||||
Error: fmt.Sprintf("%s: %s", errMsg, err.Error()),
|
||||
}
|
||||
}
|
||||
|
||||
ch := synccheck.NewSynchronizationChecker(syncClient, time.Minute)
|
||||
incomingSyncStatus, err := ch.CheckSync(ctx, &dataCentersResp, localShards)
|
||||
if err != nil {
|
||||
errMsg := "Failed to check synchronization status"
|
||||
dr.log.Err(err).Warn(errMsg)
|
||||
return api.SynchronizationStatus{
|
||||
Error: fmt.Sprintf("%s: %s", errMsg, err.Error()),
|
||||
}
|
||||
}
|
||||
return dr.createSynchronizationStatus(arangosyncVersion, incomingSyncStatus)
|
||||
}
|
||||
|
||||
// createSynchronizationStatus returns aggregated info about DCSyncStatus
|
||||
func (dr *DeploymentReplication) createSynchronizationStatus(arangosyncVersion driver.Version, dcSyncStatus *synccheck.DCSyncStatus) api.SynchronizationStatus {
|
||||
dbs := make(map[string]api.DatabaseSynchronizationStatus, len(dcSyncStatus.Databases))
|
||||
i := 0
|
||||
for dbName, dbSyncStatus := range dcSyncStatus.Databases {
|
||||
i++
|
||||
db := dbName
|
||||
if features.SensitiveInformationProtection().Enabled() {
|
||||
// internal IDs are not available in older versions
|
||||
if arangosyncVersion.CompareTo("2.12.0") >= 0 {
|
||||
db = dbSyncStatus.ID
|
||||
} else {
|
||||
db = fmt.Sprintf("<PROTECTED_INFO_%d>", i)
|
||||
}
|
||||
}
|
||||
dbs[db] = dr.createDatabaseSynchronizationStatus(dbSyncStatus)
|
||||
}
|
||||
return api.SynchronizationStatus{
|
||||
AllInSync: dcSyncStatus.AllInSync(),
|
||||
Databases: dbs,
|
||||
Error: "",
|
||||
}
|
||||
}
|
||||
|
||||
// createDatabaseSynchronizationStatus returns sync status for DB
|
||||
func (dr *DeploymentReplication) createDatabaseSynchronizationStatus(dbSyncStatus synccheck.DatabaseSyncStatus) api.DatabaseSynchronizationStatus {
|
||||
// use limit for errors because the resulting status object should not be too big
|
||||
const maxReportedIncomingSyncErrors = 20
|
||||
|
||||
var errs []api.DatabaseSynchronizationError
|
||||
var shardsTotal, shardsInSync int
|
||||
var errorsReportedToLog = 0
|
||||
for colName, colSyncStatus := range dbSyncStatus.Collections {
|
||||
if colSyncStatus.Error != "" && len(errs) < maxReportedIncomingSyncErrors {
|
||||
col := colName
|
||||
if features.SensitiveInformationProtection().Enabled() {
|
||||
col = colSyncStatus.ID
|
||||
}
|
||||
|
||||
errs = append(errs, api.DatabaseSynchronizationError{
|
||||
Collection: col,
|
||||
Shard: "",
|
||||
Message: colSyncStatus.Error,
|
||||
})
|
||||
}
|
||||
|
||||
shardsTotal += len(colSyncStatus.Shards)
|
||||
for shardIndex, shardSyncStatus := range colSyncStatus.Shards {
|
||||
if shardSyncStatus.InSync {
|
||||
shardsInSync++
|
||||
} else if errorsReportedToLog < maxReportedIncomingSyncErrors {
|
||||
dr.log.Str("db", dbSyncStatus.ID).
|
||||
Str("col", colSyncStatus.ID).
|
||||
Int("shard", shardIndex).
|
||||
Debug("incoming synchronization shard status is not in-sync: %s", shardSyncStatus.Message)
|
||||
errorsReportedToLog++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return api.DatabaseSynchronizationStatus{
|
||||
ShardsTotal: shardsTotal,
|
||||
ShardsInSync: shardsInSync,
|
||||
Errors: errs,
|
||||
}
|
||||
}
|
||||
|
||||
// createEndpointStatus creates an api EndpointStatus from the given sync status.
|
||||
func createEndpointStatus(status client.SyncInfo, outgoingID string) api.EndpointStatus {
|
||||
result := api.EndpointStatus{}
|
||||
|
|
Loading…
Reference in a new issue