mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
GT-405 Automatically fix out of sync shards with broken MerkleTree (#1296)
This commit is contained in:
parent
8e37059ab1
commit
eadd75d74f
29 changed files with 632 additions and 35 deletions
|
@ -2,6 +2,7 @@
|
|||
|
||||
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
|
||||
- (Feature) Add InSync Cache
|
||||
- (Feature) Force Rebuild Out Synced Shards
|
||||
|
||||
## [1.2.26](https://github.com/arangodb/kube-arangodb/tree/1.2.26) (2023-04-18)
|
||||
- (Bugfix) Fix manual overwrite for ReplicasCount in helm
|
||||
|
|
|
@ -69,6 +69,7 @@ Feature-wise production readiness table:
|
|||
| Operator Internal Metrics Exporter | 1.2.0 | >= 3.6.0 | Community, Enterprise | 1.2.0 | Production | True | --deployment.feature.metrics-exporter | N/A |
|
||||
| Operator Ephemeral Volumes | 1.2.2 | >= 3.7.0 | Community, Enterprise | 1.2.2 | Alpha | False | --deployment.feature.ephemeral-volumes | N/A |
|
||||
| Spec Default Restore | 1.2.21 | >= 3.7.0 | Community, Enterprise | 1.2.21 | Beta | True | --deployment.feature.deployment-spec-defaults-restore | If set to False Operator will not change ArangoDeployment Spec |
|
||||
| Force Rebuild Out Synced Shards | 1.2.27 | >= 3.8.0 | Community, Enterprise | 1.2.27 | Beta | False | --deployment.feature.force-rebuild-out-synced-shards | It should be used only if user is aware of the risks. |
|
||||
|
||||
## Operator Community Edition (CE)
|
||||
|
||||
|
|
16
cmd/cmd.go
16
cmd/cmd.go
|
@ -154,11 +154,13 @@ var (
|
|||
concurrentUploads int
|
||||
}
|
||||
operatorTimeouts struct {
|
||||
k8s time.Duration
|
||||
arangoD time.Duration
|
||||
arangoDCheck time.Duration
|
||||
reconciliation time.Duration
|
||||
agency time.Duration
|
||||
k8s time.Duration
|
||||
arangoD time.Duration
|
||||
arangoDCheck time.Duration
|
||||
reconciliation time.Duration
|
||||
agency time.Duration
|
||||
shardRebuild time.Duration
|
||||
shardRebuildRetry time.Duration
|
||||
}
|
||||
chaosOptions struct {
|
||||
allowed bool
|
||||
|
@ -211,6 +213,8 @@ func init() {
|
|||
f.DurationVar(&operatorTimeouts.arangoDCheck, "timeout.arangod-check", globals.DefaultArangoDCheckTimeout, "The version check request timeout to the ArangoDB")
|
||||
f.DurationVar(&operatorTimeouts.agency, "timeout.agency", globals.DefaultArangoDAgencyTimeout, "The Agency read timeout")
|
||||
f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR")
|
||||
f.DurationVar(&operatorTimeouts.shardRebuild, "timeout.shard-rebuild", globals.DefaultOutSyncedShardRebuildTimeout, "Timeout after which particular out-synced shard is considered as failed and rebuild is triggered")
|
||||
f.DurationVar(&operatorTimeouts.shardRebuildRetry, "timeout.shard-rebuild-retry", globals.DefaultOutSyncedShardRebuildRetryTimeout, "Timeout after which rebuild shards retry flow is triggered")
|
||||
f.DurationVar(&shutdownOptions.delay, "shutdown.delay", defaultShutdownDelay, "The delay before running shutdown handlers")
|
||||
f.DurationVar(&shutdownOptions.timeout, "shutdown.timeout", defaultShutdownTimeout, "Timeout for shutdown handlers")
|
||||
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
|
||||
|
@ -257,6 +261,8 @@ func executeMain(cmd *cobra.Command, args []string) {
|
|||
globals.GetGlobalTimeouts().Agency().Set(operatorTimeouts.agency)
|
||||
globals.GetGlobalTimeouts().ArangoDCheck().Set(operatorTimeouts.arangoDCheck)
|
||||
globals.GetGlobalTimeouts().Reconciliation().Set(operatorTimeouts.reconciliation)
|
||||
globals.GetGlobalTimeouts().ShardRebuild().Set(operatorTimeouts.shardRebuild)
|
||||
globals.GetGlobalTimeouts().ShardRebuildRetry().Set(operatorTimeouts.shardRebuildRetry)
|
||||
globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize)
|
||||
globals.GetGlobals().Backup().ConcurrentUploads().Set(operatorBackup.concurrentUploads)
|
||||
|
||||
|
|
|
@ -17,3 +17,4 @@
|
|||
- [Operator API](./api.md)
|
||||
- [Logging](./logging.md)
|
||||
- [Manual Recovery](./recovery.md)
|
||||
- [Force rebuild out-synced Shards with broken Merkle Tree](./rebuild_out_synced_shards.md)
|
||||
|
|
20
docs/design/rebuild_out_synced_shards.md
Normal file
20
docs/design/rebuild_out_synced_shards.md
Normal file
|
@ -0,0 +1,20 @@
|
|||
# Force rebuild out-synced Shards with broken Merkle Tree
|
||||
|
||||
## Overview
|
||||
|
||||
TODO
|
||||
|
||||
## How to use
|
||||
|
||||
This feature is disabled by default.
|
||||
- To enable it use `--deployment.feature.force-rebuild-out-synced-shards` arg, which needs be passed to the operator.
|
||||
- Optionally we can override default timeouts by attaching following args to the operator:
|
||||
- `--timeout.shard-rebuild {duration}` - timeout after which particular out-synced shard is considered as failed and rebuild is triggered (default 60m0s)
|
||||
- `--timeout.shard-rebuild-retry {duration}` - timeout after which rebuild shards retry flow is triggered (default 60m0s)
|
||||
|
||||
Here is the example `helm` command which enables this feature and sets shard-rebuild timeout to 10 minutes:
|
||||
```shell
|
||||
export VER=1.2.27; helm upgrade --install kube-arangodb \
|
||||
https://github.com/arangodb/kube-arangodb/releases/download/$VER/kube-arangodb-$VER.tgz \
|
||||
--set "operator.args={--deployment.feature.force-rebuild-out-synced-shards,--timeout.shard-rebuild=10m}"
|
||||
```
|
|
@ -47,6 +47,7 @@
|
|||
| RebalancerCheck | no | 10m0s | no | Enterprise Only | Check Rebalancer job progress |
|
||||
| RebalancerClean | no | 10m0s | no | Enterprise Only | Cleans Rebalancer jobs |
|
||||
| RebalancerGenerate | yes | 10m0s | no | Enterprise Only | Generates the Rebalancer plan |
|
||||
| RebuildOutSyncedShards | no | 24h0m0s | no | Community & Enterprise | Run Rebuild Out Synced Shards procedure for DBServers |
|
||||
| RecreateMember | no | 15m0s | no | Community & Enterprise | Recreate member with same ID and Data |
|
||||
| RefreshTLSKeyfileCertificate | no | 30m0s | no | Enterprise Only | Recreate Server TLS Certificate secret |
|
||||
| RemoveMember | no | 15m0s | no | Community & Enterprise | Removes member from the Cluster and Status |
|
||||
|
@ -134,6 +135,7 @@ spec:
|
|||
RebalancerCheck: 10m0s
|
||||
RebalancerClean: 10m0s
|
||||
RebalancerGenerate: 10m0s
|
||||
RebuildOutSyncedShards: 24h0m0s
|
||||
RecreateMember: 15m0s
|
||||
RefreshTLSKeyfileCertificate: 30m0s
|
||||
RemoveMember: 15m0s
|
||||
|
|
2
go.mod
2
go.mod
|
@ -78,7 +78,7 @@ require (
|
|||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/go-cmp v0.5.6 // indirect
|
||||
github.com/google/go-cmp v0.5.8 // indirect
|
||||
github.com/google/gofuzz v1.1.0 // indirect
|
||||
github.com/google/uuid v1.1.2 // indirect
|
||||
github.com/googleapis/gnostic v0.5.5 // indirect
|
||||
|
|
19
go.sum
19
go.sum
|
@ -71,8 +71,6 @@ github.com/arangodb/go-upgrade-rules v0.0.0-20180809110947-031b4774ff21 h1:+W7D5
|
|||
github.com/arangodb/go-upgrade-rules v0.0.0-20180809110947-031b4774ff21/go.mod h1:RkPIG6JJ2pcJUoymc18NxAJGraZd+iAEVnOTDjZey/w=
|
||||
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e h1:Xg+hGrY2LcQBbxd0ZFdbGSyRKTYMZCfBbw/pMJFOk1g=
|
||||
github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e/go.mod h1:mq7Shfa/CaixoDxiyAAc5jZ6CVBAyPaNQCGS7mkj4Ho=
|
||||
github.com/arangodb/rebalancer v0.1.1 h1:8MikmxlhywKnw/wiDqctD8FFwBZhAAF1E3mIqh8nzCA=
|
||||
github.com/arangodb/rebalancer v0.1.1/go.mod h1:wLvglmYNuoTUYbLQq/UESIMVkINmSX9eZWC5QB9kNyk=
|
||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
||||
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
|
||||
|
@ -249,8 +247,9 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
|
||||
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
github.com/google/gofuzz v1.1.0 h1:Hsa8mG0dQ46ij8Sl2AYJDUv1oA9/d6Vk+3LG99Oe02g=
|
||||
github.com/google/gofuzz v1.1.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
|
||||
|
@ -433,7 +432,6 @@ github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXP
|
|||
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
|
||||
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
|
||||
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
|
||||
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
|
||||
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
github.com/prometheus/client_golang v1.11.1 h1:+4eQaD7vAZ6DsfsxB15hbE0odUjGI5ARs9yskGu1v4s=
|
||||
github.com/prometheus/client_golang v1.11.1/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
|
||||
|
@ -561,7 +559,6 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U
|
|||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 h1:/UOmuWzQfxxo9UtlXMwuQU8CMgg1eZXqTRwkSQJWKOI=
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
|
||||
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
|
||||
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
|
||||
|
@ -643,7 +640,6 @@ golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v
|
|||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20211209124913-491a49abca63 h1:iocB37TsdFuN6IBRZ+ry36wrkoV51/tl5vOWqkcPGvY=
|
||||
golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.7.0 h1:rJrUqqhjsgNp7KqAIc25s9pZnjU7TUcSY7HcVZjdn1g=
|
||||
golang.org/x/net v0.7.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
|
||||
|
@ -735,17 +731,11 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
|
|||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069 h1:siQdpVirKtzPhKl3lZWozZraCFObP8S1v6PRp0bLrtU=
|
||||
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
|
||||
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.1.0 h1:kunALQeHf1/185U1i0GOB/fy1IPRDDpuoOOqRReG57U=
|
||||
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
|
||||
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
|
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE=
|
||||
golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
|
||||
golang.org/x/term v0.5.0 h1:n2a8QNdAb0sZNpU9R1ALUXBbY+w51fCQDN+7EdxNBsY=
|
||||
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
|
||||
|
@ -756,10 +746,7 @@ golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
|
|||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=
|
||||
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
|
||||
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
|
||||
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
|
||||
golang.org/x/text v0.7.0 h1:4BRB4x83lYWy72KwLD/qYDuTu7q9PjSagHvijDw7cLo=
|
||||
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
|
||||
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
|
||||
|
@ -972,6 +959,8 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
|||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
|
||||
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
|
||||
|
|
|
@ -234,3 +234,8 @@ actions:
|
|||
TimezoneSecretSet:
|
||||
description: Set timezone details in cluster
|
||||
timeout: 30m
|
||||
RebuildOutSyncedShards:
|
||||
description: Run Rebuild Out Synced Shards procedure for DBServers
|
||||
timeout: 24h
|
||||
scopes:
|
||||
- High
|
||||
|
|
|
@ -109,6 +109,8 @@ const (
|
|||
ActionRebalancerCleanDefaultTimeout time.Duration = ActionsDefaultTimeout
|
||||
// ActionRebalancerGenerateDefaultTimeout define default timeout for action ActionRebalancerGenerate
|
||||
ActionRebalancerGenerateDefaultTimeout time.Duration = ActionsDefaultTimeout
|
||||
// ActionRebuildOutSyncedShardsDefaultTimeout define default timeout for action ActionRebuildOutSyncedShards
|
||||
ActionRebuildOutSyncedShardsDefaultTimeout time.Duration = 86400 * time.Second // 24h0m0s
|
||||
// ActionRecreateMemberDefaultTimeout define default timeout for action ActionRecreateMember
|
||||
ActionRecreateMemberDefaultTimeout time.Duration = 900 * time.Second // 15m0s
|
||||
// ActionRefreshTLSKeyfileCertificateDefaultTimeout define default timeout for action ActionRefreshTLSKeyfileCertificate
|
||||
|
@ -266,6 +268,8 @@ const (
|
|||
ActionTypeRebalancerClean ActionType = "RebalancerClean"
|
||||
// ActionTypeRebalancerGenerate in scopes Normal. Generates the Rebalancer plan
|
||||
ActionTypeRebalancerGenerate ActionType = "RebalancerGenerate"
|
||||
// ActionTypeRebuildOutSyncedShards in scopes High. Run Rebuild Out Synced Shards procedure for DBServers
|
||||
ActionTypeRebuildOutSyncedShards ActionType = "RebuildOutSyncedShards"
|
||||
// ActionTypeRecreateMember in scopes Normal. Recreate member with same ID and Data
|
||||
ActionTypeRecreateMember ActionType = "RecreateMember"
|
||||
// ActionTypeRefreshTLSKeyfileCertificate in scopes Normal. Recreate Server TLS Certificate secret
|
||||
|
@ -424,6 +428,8 @@ func (a ActionType) DefaultTimeout() time.Duration {
|
|||
return ActionRebalancerCleanDefaultTimeout
|
||||
case ActionTypeRebalancerGenerate:
|
||||
return ActionRebalancerGenerateDefaultTimeout
|
||||
case ActionTypeRebuildOutSyncedShards:
|
||||
return ActionRebuildOutSyncedShardsDefaultTimeout
|
||||
case ActionTypeRecreateMember:
|
||||
return ActionRecreateMemberDefaultTimeout
|
||||
case ActionTypeRefreshTLSKeyfileCertificate:
|
||||
|
@ -586,6 +592,8 @@ func (a ActionType) Priority() ActionPriority {
|
|||
return ActionPriorityNormal
|
||||
case ActionTypeRebalancerGenerate:
|
||||
return ActionPriorityNormal
|
||||
case ActionTypeRebuildOutSyncedShards:
|
||||
return ActionPriorityHigh
|
||||
case ActionTypeRecreateMember:
|
||||
return ActionPriorityNormal
|
||||
case ActionTypeRefreshTLSKeyfileCertificate:
|
||||
|
@ -758,6 +766,8 @@ func (a ActionType) Optional() bool {
|
|||
return false
|
||||
case ActionTypeRebalancerGenerate:
|
||||
return false
|
||||
case ActionTypeRebuildOutSyncedShards:
|
||||
return false
|
||||
case ActionTypeRecreateMember:
|
||||
return false
|
||||
case ActionTypeRefreshTLSKeyfileCertificate:
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
|
|
@ -158,7 +158,7 @@ type Cache interface {
|
|||
CommitIndex() uint64
|
||||
// Health returns true when healthy object is available.
|
||||
Health() (Health, bool)
|
||||
// ShardsInSyncMap returns last in sync state of particular shard
|
||||
// ShardsInSyncMap returns last in sync state of shards. If no state is available, false is returned.
|
||||
ShardsInSyncMap() (ShardsSyncStatus, bool)
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package agency
|
||||
|
||||
// StateCurrentCollections is a map of database name to collections
|
||||
type StateCurrentCollections map[string]StateCurrentDBCollections
|
||||
|
||||
func (a StateCurrentCollections) IsDBServerPresent(name Server) bool {
|
||||
|
@ -32,6 +33,7 @@ func (a StateCurrentCollections) IsDBServerPresent(name Server) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// StateCurrentDBCollections is a map of collection name to shards
|
||||
type StateCurrentDBCollections map[string]StateCurrentDBCollection
|
||||
|
||||
func (a StateCurrentDBCollections) IsDBServerPresent(name Server) bool {
|
||||
|
@ -44,6 +46,7 @@ func (a StateCurrentDBCollections) IsDBServerPresent(name Server) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// StateCurrentDBCollection is a map of Shard name to Shard details
|
||||
type StateCurrentDBCollection map[string]StateCurrentDBShard
|
||||
|
||||
func (a StateCurrentDBCollection) IsDBServerPresent(name Server) bool {
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
package agency
|
||||
|
||||
// StatePlanCollections is a map of database name to collections
|
||||
type StatePlanCollections map[string]StatePlanDBCollections
|
||||
|
||||
func (a StatePlanCollections) IsDBServerPresent(name Server) bool {
|
||||
|
@ -40,6 +41,7 @@ func (a StatePlanCollections) IsDBServerLeader(name Server) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// StatePlanDBCollections is a map of collection name to collection details
|
||||
type StatePlanDBCollections map[string]StatePlanCollection
|
||||
|
||||
func (a StatePlanDBCollections) IsDBServerInCollections(name Server) bool {
|
||||
|
|
|
@ -24,6 +24,7 @@ import "time"
|
|||
|
||||
type ShardsSyncStatus map[string]time.Time
|
||||
|
||||
// NotInSyncSince returns a list of shards that have not been in sync for at least t.
|
||||
func (s ShardsSyncStatus) NotInSyncSince(t time.Duration) []string {
|
||||
r := make([]string, 0, len(s))
|
||||
|
||||
|
|
|
@ -167,13 +167,61 @@ func (s State) GetDBServerWithLowestShards() Server {
|
|||
return resultServer
|
||||
}
|
||||
|
||||
type ShardDetails struct {
|
||||
ShardID string
|
||||
Database string
|
||||
Collection string
|
||||
Servers Servers
|
||||
}
|
||||
|
||||
// GetShardDetailsByID returns the ShardDetails for a given ShardID. If the ShardID is not found, the second return value is false
|
||||
func (s State) GetShardDetailsByID(id string) (ShardDetails, bool) {
|
||||
// check first in Plan
|
||||
for dbName, db := range s.Plan.Collections {
|
||||
for colName, col := range db {
|
||||
for sName, servers := range col.Shards {
|
||||
if sName == id {
|
||||
return ShardDetails{
|
||||
ShardID: sName,
|
||||
Database: dbName,
|
||||
Collection: colName,
|
||||
Servers: servers,
|
||||
}, true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check in Current
|
||||
for dbName, db := range s.Current.Collections {
|
||||
for colName, col := range db {
|
||||
for sName, shard := range col {
|
||||
if sName == id {
|
||||
return ShardDetails{
|
||||
ShardID: sName,
|
||||
Database: dbName,
|
||||
Collection: colName,
|
||||
Servers: shard.Servers,
|
||||
}, true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ShardDetails{}, false
|
||||
}
|
||||
|
||||
type ShardStatus struct {
|
||||
IsSynced bool
|
||||
}
|
||||
|
||||
func (s State) GetShardsStatus() map[string]bool {
|
||||
q := map[string]bool{}
|
||||
|
||||
for dName, d := range s.Plan.Collections {
|
||||
for cName, c := range d {
|
||||
for sName, shard := range c.Shards {
|
||||
q[sName] = s.IsShardInSync(dName, cName, sName, shard)
|
||||
for sName, servers := range c.Shards {
|
||||
q[sName] = s.IsShardInSync(dName, cName, sName, servers)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -165,16 +165,16 @@ func (d *Deployment) GetDatabaseWithWrap(wrappers ...conn.ConnectionWrap) (drive
|
|||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
conn := c.Connection()
|
||||
dbConn := c.Connection()
|
||||
|
||||
for _, w := range wrappers {
|
||||
if w != nil {
|
||||
conn = w(conn)
|
||||
dbConn = w(dbConn)
|
||||
}
|
||||
}
|
||||
|
||||
return driver.NewClient(driver.ClientConfig{
|
||||
Connection: conn,
|
||||
Connection: dbConn,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -187,6 +187,18 @@ func (d *Deployment) GetDatabaseAsyncClient(ctx context.Context) (driver.Client,
|
|||
return c, nil
|
||||
}
|
||||
|
||||
// GetServerAsyncClient returns an async client for a specific server.
|
||||
func (d *Deployment) GetServerAsyncClient(id string) (driver.Client, error) {
|
||||
c, err := d.GetMembersState().GetMemberClient(id)
|
||||
if err != nil {
|
||||
return nil, errors.WithStack(err)
|
||||
}
|
||||
|
||||
return driver.NewClient(driver.ClientConfig{
|
||||
Connection: conn.NewAsyncConnection(c.Connection()),
|
||||
})
|
||||
}
|
||||
|
||||
// GetServerClient returns a cached client for a specific server.
|
||||
func (d *Deployment) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
|
||||
c, err := d.clientCache.Get(ctx, group, id)
|
||||
|
|
|
@ -164,6 +164,11 @@ func (d *Deployment) GetAgencyHealth() (agency.Health, bool) {
|
|||
return d.agencyCache.Health()
|
||||
}
|
||||
|
||||
// ShardsInSyncMap returns last in sync state of shards. If no state is available, false is returned.
|
||||
func (d *Deployment) ShardsInSyncMap() (agency.ShardsSyncStatus, bool) {
|
||||
return d.agencyCache.ShardsInSyncMap()
|
||||
}
|
||||
|
||||
func (d *Deployment) GetAgencyArangoDBCache() (agency.StateDB, bool) {
|
||||
return d.agencyCache.DataDB()
|
||||
}
|
||||
|
|
38
pkg/deployment/features/rebuild_out_synced_shards.go
Normal file
38
pkg/deployment/features/rebuild_out_synced_shards.go
Normal file
|
@ -0,0 +1,38 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package features
|
||||
|
||||
func init() {
|
||||
registerFeature(rebuildOutSyncedShards)
|
||||
}
|
||||
|
||||
var rebuildOutSyncedShards = &feature{
|
||||
name: "force-rebuild-out-synced-shards",
|
||||
description: "Force rebuild permanently out-synced shards due to a bug in ArangoDB 3.8-3.10",
|
||||
version: "3.8.0",
|
||||
enterpriseRequired: false,
|
||||
enabledByDefault: false,
|
||||
hidden: true,
|
||||
}
|
||||
|
||||
func RebuildOutSyncedShards() Feature {
|
||||
return rebuildOutSyncedShards
|
||||
}
|
|
@ -153,6 +153,9 @@ var (
|
|||
_ Action = &actionRebalancerGenerate{}
|
||||
_ actionFactory = newRebalancerGenerateAction
|
||||
|
||||
_ Action = &actionRebuildOutSyncedShards{}
|
||||
_ actionFactory = newRebuildOutSyncedShardsAction
|
||||
|
||||
_ Action = &actionRecreateMember{}
|
||||
_ actionFactory = newRecreateMemberAction
|
||||
|
||||
|
@ -766,6 +769,18 @@ func init() {
|
|||
registerAction(action, function)
|
||||
}
|
||||
|
||||
// RebuildOutSyncedShards
|
||||
{
|
||||
// Get Action defition
|
||||
function := newRebuildOutSyncedShardsAction
|
||||
action := api.ActionTypeRebuildOutSyncedShards
|
||||
|
||||
// Wrap action main function
|
||||
|
||||
// Register action
|
||||
registerAction(action, function)
|
||||
}
|
||||
|
||||
// RecreateMember
|
||||
{
|
||||
// Get Action defition
|
||||
|
|
|
@ -450,6 +450,16 @@ func Test_Actions(t *testing.T) {
|
|||
})
|
||||
})
|
||||
|
||||
t.Run("RebuildOutSyncedShards", func(t *testing.T) {
|
||||
ActionsExistence(t, api.ActionTypeRebuildOutSyncedShards)
|
||||
t.Run("Internal", func(t *testing.T) {
|
||||
require.False(t, api.ActionTypeRebuildOutSyncedShards.Internal())
|
||||
})
|
||||
t.Run("Optional", func(t *testing.T) {
|
||||
require.False(t, api.ActionTypeRebuildOutSyncedShards.Optional())
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("RecreateMember", func(t *testing.T) {
|
||||
ActionsExistence(t, api.ActionTypeRecreateMember)
|
||||
t.Run("Internal", func(t *testing.T) {
|
||||
|
|
|
@ -168,6 +168,10 @@ func (ac *actionContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Cli
|
|||
return ac.context.GetDatabaseAsyncClient(ctx)
|
||||
}
|
||||
|
||||
func (ac *actionContext) GetServerAsyncClient(id string) (driver.Client, error) {
|
||||
return ac.context.GetServerAsyncClient(id)
|
||||
}
|
||||
|
||||
func (ac *actionContext) CurrentLocals() api.PlanLocals {
|
||||
return ac.locals
|
||||
}
|
||||
|
@ -251,6 +255,10 @@ func (ac *actionContext) GetAgencyHealth() (agencyCache.Health, bool) {
|
|||
return ac.context.GetAgencyHealth()
|
||||
}
|
||||
|
||||
func (ac *actionContext) ShardsInSyncMap() (agencyCache.ShardsSyncStatus, bool) {
|
||||
return ac.context.ShardsInSyncMap()
|
||||
}
|
||||
|
||||
func (ac *actionContext) GetAgencyCache() (agencyCache.State, bool) {
|
||||
return ac.context.GetAgencyCache()
|
||||
}
|
||||
|
|
280
pkg/deployment/reconcile/action_rebuild_outsynced_shards.go
Normal file
280
pkg/deployment/reconcile/action_rebuild_outsynced_shards.go
Normal file
|
@ -0,0 +1,280 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package reconcile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/arangodb/go-driver"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
)
|
||||
|
||||
const (
|
||||
actionRebuildOutSyncedShardsBatchTTL = 600 * time.Second
|
||||
actionRebuildOutSyncedShardsLocalJobID api.PlanLocalKey = "rebuildJobID"
|
||||
actionRebuildOutSyncedShardsLocalDatabase api.PlanLocalKey = "database"
|
||||
actionRebuildOutSyncedShardsLocalShard api.PlanLocalKey = "shard"
|
||||
actionRebuildOutSyncedShardsBatchID api.PlanLocalKey = "batchID"
|
||||
)
|
||||
|
||||
// newRebuildOutSyncedShardsAction creates a new Action that implements the given
|
||||
// planned RebuildOutSyncedShards action.
|
||||
func newRebuildOutSyncedShardsAction(action api.Action, actionCtx ActionContext) Action {
|
||||
a := &actionRebuildOutSyncedShards{}
|
||||
|
||||
a.actionImpl = newActionImplDefRef(action, actionCtx)
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
// actionRebuildOutSyncedShards implements an RebuildOutSyncedShardsAction.
|
||||
type actionRebuildOutSyncedShards struct {
|
||||
// actionImpl implement timeout and member id functions
|
||||
actionImpl
|
||||
}
|
||||
|
||||
// Start performs the start of the action.
|
||||
// Returns true if the action is completely finished, false in case
|
||||
// the start time needs to be recorded and a ready condition needs to be checked.
|
||||
func (a *actionRebuildOutSyncedShards) Start(ctx context.Context) (bool, error) {
|
||||
if !features.RebuildOutSyncedShards().Enabled() {
|
||||
// RebuildOutSyncedShards feature is not enabled
|
||||
return true, nil
|
||||
}
|
||||
|
||||
clientSync, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "Unable to create client (SyncMode)")
|
||||
}
|
||||
|
||||
clientAsync, err := a.actionCtx.GetServerAsyncClient(a.action.MemberID)
|
||||
if err != nil {
|
||||
return false, errors.Wrapf(err, "Unable to create client (AsyncMode)")
|
||||
}
|
||||
|
||||
shardID, exist := a.action.GetParam("shardID")
|
||||
if !exist {
|
||||
a.log.Error("*shardID* key not found in action params")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
database, exist := a.action.GetParam("database")
|
||||
if !exist {
|
||||
a.log.Error("*database* key not found in action params")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// trigger async rebuild job
|
||||
err = a.rebuildShard(ctx, clientSync, clientAsync, shardID, database)
|
||||
if err != nil {
|
||||
a.log.Err(err).Error("Rebuild Shard Tree action failed on start", shardID, database, a.action.MemberID)
|
||||
return true, err
|
||||
}
|
||||
|
||||
a.log.Info("Triggering async job Shard Tree rebuild", shardID, database, a.action.MemberID)
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// CheckProgress returns: ready, abort, error.
|
||||
func (a *actionRebuildOutSyncedShards) CheckProgress(ctx context.Context) (bool, bool, error) {
|
||||
if !features.RebuildOutSyncedShards().Enabled() {
|
||||
// RebuildOutSyncedShards feature is not enabled
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
clientSync, err := a.actionCtx.GetMembersState().GetMemberClient(a.action.MemberID)
|
||||
if err != nil {
|
||||
return false, false, errors.Wrapf(err, "Unable to create client (SyncMode)")
|
||||
}
|
||||
|
||||
clientAsync, err := a.actionCtx.GetServerAsyncClient(a.action.MemberID)
|
||||
if err != nil {
|
||||
return false, false, errors.Wrapf(err, "Unable to create client (AsyncMode)")
|
||||
}
|
||||
|
||||
jobID, ok := a.actionCtx.Get(a.action, actionRebuildOutSyncedShardsLocalJobID)
|
||||
if !ok {
|
||||
return false, true, errors.Newf("Local Key is missing in action: %s", actionRebuildOutSyncedShardsLocalJobID)
|
||||
}
|
||||
|
||||
batchID, ok := a.actionCtx.Get(a.action, actionRebuildOutSyncedShardsBatchID)
|
||||
if !ok {
|
||||
return false, true, errors.Newf("Local Key is missing in action: %s", actionRebuildOutSyncedShardsBatchID)
|
||||
}
|
||||
|
||||
database, ok := a.actionCtx.Get(a.action, actionRebuildOutSyncedShardsLocalDatabase)
|
||||
if !ok {
|
||||
return false, true, errors.Newf("Local Key is missing in action: %s", actionRebuildOutSyncedShardsLocalDatabase)
|
||||
}
|
||||
|
||||
shardID, ok := a.actionCtx.Get(a.action, actionRebuildOutSyncedShardsLocalShard)
|
||||
if !ok {
|
||||
return false, true, errors.Newf("Local Key is missing in action: %s", actionRebuildOutSyncedShardsLocalShard)
|
||||
}
|
||||
|
||||
// check first if there is rebuild job running
|
||||
rebuildInProgress, err := a.checkRebuildShardProgress(ctx, clientAsync, clientSync, shardID, database, jobID, batchID)
|
||||
if err != nil {
|
||||
if rebuildInProgress {
|
||||
a.log.Err(err).Error("Rebuild job failed but we will retry", shardID, database, a.action.MemberID)
|
||||
return false, false, err
|
||||
} else {
|
||||
a.log.Err(err).Error("Rebuild job failed", shardID, database, a.action.MemberID)
|
||||
return false, true, err
|
||||
}
|
||||
|
||||
}
|
||||
if rebuildInProgress {
|
||||
a.log.Debug("Rebuild job is still in progress", shardID, database, a.action.MemberID)
|
||||
return false, false, nil
|
||||
}
|
||||
|
||||
// rebuild job is done
|
||||
a.log.Info("Rebuild Shard Tree is done", shardID, database, a.action.MemberID)
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
func (a *actionRebuildOutSyncedShards) rebuildShard(ctx context.Context, clientSync, clientAsync driver.Client, shardID, database string) error {
|
||||
batchID, err := a.createBatch(ctx, clientSync)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Unable to create batch")
|
||||
}
|
||||
|
||||
req, err := a.createShardRebuildRequest(clientAsync, shardID, database, batchID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = clientAsync.Connection().Do(ctx, req)
|
||||
if id, ok := conn.IsAsyncJobInProgress(err); ok {
|
||||
a.actionCtx.Add(actionRebuildOutSyncedShardsLocalJobID, id, true)
|
||||
a.actionCtx.Add(actionRebuildOutSyncedShardsLocalDatabase, database, true)
|
||||
a.actionCtx.Add(actionRebuildOutSyncedShardsLocalShard, shardID, true)
|
||||
a.actionCtx.Add(actionRebuildOutSyncedShardsBatchID, batchID, true)
|
||||
// Async request has been sent
|
||||
return nil
|
||||
} else {
|
||||
return errors.Wrapf(err, "Unknown rebuild request error")
|
||||
}
|
||||
}
|
||||
|
||||
// checkRebuildShardProgress returns: inProgress, error.
|
||||
func (a *actionRebuildOutSyncedShards) checkRebuildShardProgress(ctx context.Context, clientAsync, clientSync driver.Client, shardID, database, jobID, batchID string) (bool, error) {
|
||||
req, err := a.createShardRebuildRequest(clientAsync, shardID, database, batchID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
resp, err := clientAsync.Connection().Do(conn.WithAsyncID(ctx, jobID), req)
|
||||
if err != nil {
|
||||
if _, ok := conn.IsAsyncJobInProgress(err); ok {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Add wait grace period
|
||||
if ok := conn.IsAsyncErrorNotFound(err); ok {
|
||||
if s := a.action.StartTime; s != nil && !s.Time.IsZero() {
|
||||
if time.Since(s.Time) < 10*time.Second {
|
||||
// Retry
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false, errors.Wrapf(err, "check rebuild progress error")
|
||||
}
|
||||
|
||||
// cleanup batch
|
||||
_ = a.deleteBatch(ctx, clientSync, batchID)
|
||||
|
||||
if resp.StatusCode() == http.StatusNoContent {
|
||||
return false, nil
|
||||
} else {
|
||||
return false, errors.Wrapf(err, "rebuild progress failed with status code %d", resp.StatusCode())
|
||||
}
|
||||
}
|
||||
|
||||
//************************** API Calls ************************************
|
||||
|
||||
// createShardRebuildRequest creates request for rebuilding shard. Returns request, error.
|
||||
func (a *actionRebuildOutSyncedShards) createShardRebuildRequest(clientAsync driver.Client, shardID, database, batchID string) (driver.Request, error) {
|
||||
req, err := clientAsync.Connection().NewRequest("POST", path.Join("_db", database, "_api/replication/revisions/tree"))
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "Unable to create rebuild shard request, shard %s, database: %s", shardID, database)
|
||||
}
|
||||
req = req.SetQuery("batchId", batchID)
|
||||
req = req.SetQuery("collection", shardID)
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// createBatch creates batch on the server. Returns batchID, error.
|
||||
func (a *actionRebuildOutSyncedShards) createBatch(ctx context.Context, clientSync driver.Client) (string, error) {
|
||||
req, err := clientSync.Connection().NewRequest("POST", path.Join("_api/replication/batch"))
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "Unable to create request for batch creation")
|
||||
}
|
||||
params := struct {
|
||||
TTL float64 `json:"ttl"`
|
||||
}{TTL: actionRebuildOutSyncedShardsBatchTTL.Seconds()}
|
||||
req, err = req.SetBody(params)
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "Unable to add body to the batch creation request")
|
||||
}
|
||||
|
||||
resp, err := clientSync.Connection().Do(ctx, req)
|
||||
if err != nil {
|
||||
return "", errors.Wrapf(err, "Unable to create batch, request failed")
|
||||
}
|
||||
if err := resp.CheckStatus(200); err != nil {
|
||||
return "", errors.Wrapf(err, "Unable to create batch, wrong status code %d", resp.StatusCode())
|
||||
}
|
||||
var batch struct {
|
||||
ID string `json:"id"`
|
||||
}
|
||||
|
||||
if err := resp.ParseBody("", &batch); err != nil {
|
||||
return "", errors.Wrapf(err, "Unable to parse batch creation response")
|
||||
}
|
||||
return batch.ID, nil
|
||||
}
|
||||
|
||||
// deleteBatch removes batch from the server
|
||||
func (a *actionRebuildOutSyncedShards) deleteBatch(ctx context.Context, clientSync driver.Client, batchID string) error {
|
||||
req, err := clientSync.Connection().NewRequest("DELETE", path.Join("_api/replication/batch", batchID))
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Unable to create request for batch removal")
|
||||
}
|
||||
|
||||
resp, err := clientSync.Connection().Do(ctx, req)
|
||||
if err != nil {
|
||||
return errors.Wrapf(err, "Unable to remove batch, request failed")
|
||||
}
|
||||
if err := resp.CheckStatus(204); err != nil {
|
||||
return errors.Wrapf(err, "Unable to remove batch, wrong status code %d", resp.StatusCode())
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -49,6 +49,7 @@ func (r *Reconciler) createNormalPlan(ctx context.Context, apiObject k8sutil.API
|
|||
ApplyIfEmpty(r.createScaleUPMemberPlan).
|
||||
// Check for failed members
|
||||
ApplyIfEmpty(r.createMemberFailedRestoreNormalPlan).
|
||||
ApplyIfEmpty(r.createRebuildOutSyncedPlan).
|
||||
// Check for scale up/down
|
||||
ApplyIfEmpty(r.createScaleMemberPlan).
|
||||
// Update status
|
||||
|
|
|
@ -0,0 +1,101 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
|
||||
package reconcile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/actions"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/globals"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
var lastTriggeredRebuildOutSyncedShards time.Time
|
||||
|
||||
// createRotateOrUpgradePlan
|
||||
func (r *Reconciler) createRebuildOutSyncedPlan(ctx context.Context, apiObject k8sutil.APIObject,
|
||||
spec api.DeploymentSpec, status api.DeploymentStatus,
|
||||
context PlanBuilderContext) api.Plan {
|
||||
var plan api.Plan
|
||||
|
||||
if !features.RebuildOutSyncedShards().Enabled() {
|
||||
// RebuildOutSyncedShards feature is not enabled
|
||||
return nil
|
||||
}
|
||||
|
||||
// to prevent from rebuilding out-synced shards again and again we will trigger rebuild only once per T minutes
|
||||
if time.Since(lastTriggeredRebuildOutSyncedShards) < globals.GetGlobalTimeouts().ShardRebuildRetry().Get() {
|
||||
// we already triggered rebuild out-synced shards recently
|
||||
return nil
|
||||
}
|
||||
|
||||
shardStatus, ok := context.ShardsInSyncMap()
|
||||
if !ok {
|
||||
r.log.Error("Unable to get shards status")
|
||||
return nil
|
||||
}
|
||||
agencyState, ok := context.GetAgencyCache()
|
||||
if !ok {
|
||||
r.log.Error("Unable to get agency state")
|
||||
return nil
|
||||
}
|
||||
|
||||
members := map[string]api.MemberStatus{}
|
||||
for _, m := range status.Members.AsListInGroup(api.ServerGroupDBServers) {
|
||||
members[m.Member.ID] = m.Member
|
||||
}
|
||||
|
||||
// Get shards which are out-synced for more than defined timeout
|
||||
outSyncedShardsIDs := shardStatus.NotInSyncSince(globals.GetGlobalTimeouts().ShardRebuild().Get())
|
||||
|
||||
if len(outSyncedShardsIDs) > 0 {
|
||||
// Create plan for out-synced shards
|
||||
for _, shardID := range outSyncedShardsIDs {
|
||||
shard, exist := agencyState.GetShardDetailsByID(shardID)
|
||||
if !exist {
|
||||
r.log.Error("Shard servers not found", shardID, shard.Database)
|
||||
continue
|
||||
}
|
||||
|
||||
for _, server := range shard.Servers {
|
||||
member, ok := members[string(server)]
|
||||
if !ok {
|
||||
r.log.Error("Member not found - we can not fix out-synced shard!", server)
|
||||
} else {
|
||||
r.log.Info("Shard is out-synced and its Tree will be rebuild", shardID, shard.Database, shard.Collection, member.ID)
|
||||
|
||||
action := actions.NewAction(api.ActionTypeRebuildOutSyncedShards, api.ServerGroupDBServers, member).
|
||||
AddParam("shardID", shardID).
|
||||
AddParam("database", shard.Database)
|
||||
|
||||
plan = append(plan, action)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// save time when we triggered rebuild out-synced shards last time
|
||||
lastTriggeredRebuildOutSyncedShards = time.Now()
|
||||
}
|
||||
return plan
|
||||
}
|
|
@ -115,6 +115,11 @@ func (c *testContext) GetAgencyHealth() (agencyCache.Health, bool) {
|
|||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *testContext) ShardsInSyncMap() (agencyCache.ShardsSyncStatus, bool) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *testContext) RenderPodForMember(ctx context.Context, acs sutil.ACS, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
|
@ -133,6 +138,11 @@ func (c *testContext) GetDatabaseAsyncClient(ctx context.Context) (driver.Client
|
|||
panic("implement me")
|
||||
}
|
||||
|
||||
func (ac *testContext) GetServerAsyncClient(id string) (driver.Client, error) {
|
||||
//TODO implement me
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (c *testContext) GetMembersState() member.StateInspector {
|
||||
return c.state
|
||||
}
|
||||
|
|
|
@ -98,6 +98,7 @@ type ArangoAgencyGet interface {
|
|||
GetAgencyCache() (agencyCache.State, bool)
|
||||
GetAgencyArangoDBCache() (agencyCache.StateDB, bool)
|
||||
GetAgencyHealth() (agencyCache.Health, bool)
|
||||
ShardsInSyncMap() (agencyCache.ShardsSyncStatus, bool)
|
||||
}
|
||||
|
||||
type ArangoAgency interface {
|
||||
|
@ -132,6 +133,8 @@ type DeploymentDatabaseClient interface {
|
|||
// GetDatabaseAsyncClient returns a cached client for the entire database (cluster coordinators or single server),
|
||||
// creating one if needed. Only in AsyncMode
|
||||
GetDatabaseAsyncClient(ctx context.Context) (driver.Client, error)
|
||||
// GetServerAsyncClient returns an async client for a specific server.
|
||||
GetServerAsyncClient(id string) (driver.Client, error)
|
||||
}
|
||||
|
||||
type DeploymentMemberClient interface {
|
||||
|
|
|
@ -79,6 +79,13 @@ func (a async) Do(ctx context.Context, req driver.Request) (driver.Response, err
|
|||
case http.StatusNotFound:
|
||||
return nil, newAsyncErrorNotFound(id)
|
||||
case http.StatusNoContent:
|
||||
asyncID := resp.Header(constants.ArangoHeaderAsyncIDKey)
|
||||
if asyncID == id {
|
||||
// Job is done
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Job is in progress
|
||||
return nil, newAsyncJobInProgress(id)
|
||||
default:
|
||||
return resp, nil
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
|
||||
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -29,6 +29,12 @@ const (
|
|||
DefaultArangoDCheckTimeout = time.Second * 2
|
||||
DefaultReconciliationTimeout = time.Minute
|
||||
|
||||
// DefaultOutSyncedShardRebuildTimeout
|
||||
// timeout after which particular out-synced shard is considered as failed and rebuild is triggered
|
||||
DefaultOutSyncedShardRebuildTimeout = time.Minute * 60
|
||||
// DefaultOutSyncedShardRebuildRetryTimeout timeout after which rebuild shards retry flow is triggered
|
||||
DefaultOutSyncedShardRebuildRetryTimeout = time.Minute * 60
|
||||
|
||||
DefaultKubernetesRequestBatchSize = 256
|
||||
|
||||
DefaultBackupConcurrentUploads = 4
|
||||
|
@ -36,11 +42,13 @@ const (
|
|||
|
||||
var globalObj = &globals{
|
||||
timeouts: &globalTimeouts{
|
||||
requests: NewTimeout(DefaultKubernetesTimeout),
|
||||
arangod: NewTimeout(DefaultArangoDTimeout),
|
||||
arangodCheck: NewTimeout(DefaultArangoDCheckTimeout),
|
||||
reconciliation: NewTimeout(DefaultReconciliationTimeout),
|
||||
agency: NewTimeout(DefaultArangoDAgencyTimeout),
|
||||
requests: NewTimeout(DefaultKubernetesTimeout),
|
||||
arangod: NewTimeout(DefaultArangoDTimeout),
|
||||
arangodCheck: NewTimeout(DefaultArangoDCheckTimeout),
|
||||
reconciliation: NewTimeout(DefaultReconciliationTimeout),
|
||||
agency: NewTimeout(DefaultArangoDAgencyTimeout),
|
||||
shardRebuild: NewTimeout(DefaultOutSyncedShardRebuildTimeout),
|
||||
shardRebuildRetry: NewTimeout(DefaultOutSyncedShardRebuildRetryTimeout),
|
||||
},
|
||||
kubernetes: &globalKubernetes{
|
||||
requestBatchSize: NewInt64(DefaultKubernetesRequestBatchSize),
|
||||
|
@ -108,6 +116,8 @@ func (g *globalBackup) ConcurrentUploads() Int {
|
|||
|
||||
type GlobalTimeouts interface {
|
||||
Reconciliation() Timeout
|
||||
ShardRebuild() Timeout
|
||||
ShardRebuildRetry() Timeout
|
||||
|
||||
Kubernetes() Timeout
|
||||
ArangoD() Timeout
|
||||
|
@ -116,7 +126,7 @@ type GlobalTimeouts interface {
|
|||
}
|
||||
|
||||
type globalTimeouts struct {
|
||||
requests, arangod, reconciliation, arangodCheck, agency Timeout
|
||||
requests, arangod, reconciliation, arangodCheck, agency, shardRebuild, shardRebuildRetry Timeout
|
||||
}
|
||||
|
||||
func (g *globalTimeouts) Agency() Timeout {
|
||||
|
@ -131,6 +141,14 @@ func (g *globalTimeouts) Reconciliation() Timeout {
|
|||
return g.reconciliation
|
||||
}
|
||||
|
||||
func (g *globalTimeouts) ShardRebuild() Timeout {
|
||||
return g.shardRebuild
|
||||
}
|
||||
|
||||
func (g *globalTimeouts) ShardRebuildRetry() Timeout {
|
||||
return g.shardRebuildRetry
|
||||
}
|
||||
|
||||
func (g *globalTimeouts) ArangoD() Timeout {
|
||||
return g.arangod
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue