1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

GT-457 [Feature] Rebalancer V2 (#1345)

This commit is contained in:
Adam Janikowski 2023-07-04 11:51:56 +02:00 committed by GitHub
parent a7b5e47759
commit 0cd5a2a65f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 938 additions and 13 deletions

View file

@ -6,6 +6,7 @@
- (Feature) Agency Improvements
- (Bugfix) Fix agency timeout
- (Improvement) Extract Agency Timeout
- (Feature) Rebalancer V2
## [1.2.30](https://github.com/arangodb/kube-arangodb/tree/1.2.30) (2023-06-16)
- (Feature) AgencyCache Interface

View file

@ -45,8 +45,11 @@
| PVCResized | no | 15m0s | no | Community & Enterprise | Waits for PVC resize to be completed |
| PlaceHolder | no | 10m0s | no | Community & Enterprise | Empty placeholder action |
| RebalancerCheck | no | 10m0s | no | Enterprise Only | Check Rebalancer job progress |
| RebalancerCheckV2 | no | 10m0s | no | Community & Enterprise | Check Rebalancer job progress |
| RebalancerClean | no | 10m0s | no | Enterprise Only | Cleans Rebalancer jobs |
| RebalancerCleanV2 | no | 10m0s | no | Community & Enterprise | Cleans Rebalancer jobs |
| RebalancerGenerate | yes | 10m0s | no | Enterprise Only | Generates the Rebalancer plan |
| RebalancerGenerateV2 | yes | 10m0s | no | Community & Enterprise | Generates the Rebalancer plan |
| RebuildOutSyncedShards | no | 24h0m0s | no | Community & Enterprise | Run Rebuild Out Synced Shards procedure for DBServers |
| RecreateMember | no | 15m0s | no | Community & Enterprise | Recreate member with same ID and Data |
| RefreshTLSKeyfileCertificate | no | 30m0s | no | Enterprise Only | Recreate Server TLS Certificate secret |
@ -134,8 +137,11 @@ spec:
PVCResized: 15m0s
PlaceHolder: 10m0s
RebalancerCheck: 10m0s
RebalancerCheckV2: 10m0s
RebalancerClean: 10m0s
RebalancerCleanV2: 10m0s
RebalancerGenerate: 10m0s
RebalancerGenerateV2: 10m0s
RebuildOutSyncedShards: 24h0m0s
RecreateMember: 15m0s
RefreshTLSKeyfileCertificate: 30m0s

View file

@ -232,6 +232,13 @@ actions:
RebalancerClean:
enterprise: true
description: Cleans Rebalancer jobs
RebalancerGenerateV2:
description: Generates the Rebalancer plan
isInternal: true
RebalancerCheckV2:
description: Check Rebalancer job progress
RebalancerCleanV2:
description: Cleans Rebalancer jobs
ResourceSync:
description: Runs the Resource sync
TimezoneSecretSet:

View file

@ -105,10 +105,16 @@ const (
ActionPlaceHolderDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCheckDefaultTimeout define default timeout for action ActionRebalancerCheck
ActionRebalancerCheckDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCheckV2DefaultTimeout define default timeout for action ActionRebalancerCheckV2
ActionRebalancerCheckV2DefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCleanDefaultTimeout define default timeout for action ActionRebalancerClean
ActionRebalancerCleanDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCleanV2DefaultTimeout define default timeout for action ActionRebalancerCleanV2
ActionRebalancerCleanV2DefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerGenerateDefaultTimeout define default timeout for action ActionRebalancerGenerate
ActionRebalancerGenerateDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerGenerateV2DefaultTimeout define default timeout for action ActionRebalancerGenerateV2
ActionRebalancerGenerateV2DefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebuildOutSyncedShardsDefaultTimeout define default timeout for action ActionRebuildOutSyncedShards
ActionRebuildOutSyncedShardsDefaultTimeout time.Duration = 86400 * time.Second // 24h0m0s
// ActionRecreateMemberDefaultTimeout define default timeout for action ActionRecreateMember
@ -266,10 +272,16 @@ const (
ActionTypePlaceHolder ActionType = "PlaceHolder"
// ActionTypeRebalancerCheck in scopes Normal. Check Rebalancer job progress
ActionTypeRebalancerCheck ActionType = "RebalancerCheck"
// ActionTypeRebalancerCheckV2 in scopes Normal. Check Rebalancer job progress
ActionTypeRebalancerCheckV2 ActionType = "RebalancerCheckV2"
// ActionTypeRebalancerClean in scopes Normal. Cleans Rebalancer jobs
ActionTypeRebalancerClean ActionType = "RebalancerClean"
// ActionTypeRebalancerCleanV2 in scopes Normal. Cleans Rebalancer jobs
ActionTypeRebalancerCleanV2 ActionType = "RebalancerCleanV2"
// ActionTypeRebalancerGenerate in scopes Normal. Generates the Rebalancer plan
ActionTypeRebalancerGenerate ActionType = "RebalancerGenerate"
// ActionTypeRebalancerGenerateV2 in scopes Normal. Generates the Rebalancer plan
ActionTypeRebalancerGenerateV2 ActionType = "RebalancerGenerateV2"
// ActionTypeRebuildOutSyncedShards in scopes High. Run Rebuild Out Synced Shards procedure for DBServers
ActionTypeRebuildOutSyncedShards ActionType = "RebuildOutSyncedShards"
// ActionTypeRecreateMember in scopes Normal. Recreate member with same ID and Data
@ -428,10 +440,16 @@ func (a ActionType) DefaultTimeout() time.Duration {
return ActionPlaceHolderDefaultTimeout
case ActionTypeRebalancerCheck:
return ActionRebalancerCheckDefaultTimeout
case ActionTypeRebalancerCheckV2:
return ActionRebalancerCheckV2DefaultTimeout
case ActionTypeRebalancerClean:
return ActionRebalancerCleanDefaultTimeout
case ActionTypeRebalancerCleanV2:
return ActionRebalancerCleanV2DefaultTimeout
case ActionTypeRebalancerGenerate:
return ActionRebalancerGenerateDefaultTimeout
case ActionTypeRebalancerGenerateV2:
return ActionRebalancerGenerateV2DefaultTimeout
case ActionTypeRebuildOutSyncedShards:
return ActionRebuildOutSyncedShardsDefaultTimeout
case ActionTypeRecreateMember:
@ -594,10 +612,16 @@ func (a ActionType) Priority() ActionPriority {
return ActionPriorityNormal
case ActionTypeRebalancerCheck:
return ActionPriorityNormal
case ActionTypeRebalancerCheckV2:
return ActionPriorityNormal
case ActionTypeRebalancerClean:
return ActionPriorityNormal
case ActionTypeRebalancerCleanV2:
return ActionPriorityNormal
case ActionTypeRebalancerGenerate:
return ActionPriorityNormal
case ActionTypeRebalancerGenerateV2:
return ActionPriorityNormal
case ActionTypeRebuildOutSyncedShards:
return ActionPriorityHigh
case ActionTypeRecreateMember:
@ -682,6 +706,8 @@ func (a ActionType) Internal() bool {
switch a {
case ActionTypeRebalancerGenerate:
return true
case ActionTypeRebalancerGenerateV2:
return true
default:
return false
}
@ -770,10 +796,16 @@ func (a ActionType) Optional() bool {
return false
case ActionTypeRebalancerCheck:
return false
case ActionTypeRebalancerCheckV2:
return false
case ActionTypeRebalancerClean:
return false
case ActionTypeRebalancerCleanV2:
return false
case ActionTypeRebalancerGenerate:
return false
case ActionTypeRebalancerGenerateV2:
return false
case ActionTypeRebuildOutSyncedShards:
return false
case ActionTypeRecreateMember:

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -20,6 +20,8 @@
package v1
const ArangoDeploymentRebalancerDefaultParallelMoves = 32
type ArangoDeploymentRebalancerSpec struct {
Enabled *bool `json:"enabled"`
@ -42,19 +44,21 @@ func (a *ArangoDeploymentRebalancerSpec) IsEnabled() bool {
return *a.Enabled
}
func (a *ArangoDeploymentRebalancerSpec) GetParallelMoves(d int) int {
func (a *ArangoDeploymentRebalancerSpec) GetParallelMoves() int {
if !a.IsEnabled() {
return d
return ArangoDeploymentRebalancerDefaultParallelMoves
}
if a == nil || a.ParallelMoves == nil {
return d
return ArangoDeploymentRebalancerDefaultParallelMoves
}
return *a.ParallelMoves
}
type ArangoDeploymentRebalancerReadersSpec struct {
// deprecated does not work in Rebalancer V2
// Count Enable Shard Count machanism
Count *bool `json:"count,omitempty"`
}

View file

@ -105,10 +105,16 @@ const (
ActionPlaceHolderDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCheckDefaultTimeout define default timeout for action ActionRebalancerCheck
ActionRebalancerCheckDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCheckV2DefaultTimeout define default timeout for action ActionRebalancerCheckV2
ActionRebalancerCheckV2DefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCleanDefaultTimeout define default timeout for action ActionRebalancerClean
ActionRebalancerCleanDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerCleanV2DefaultTimeout define default timeout for action ActionRebalancerCleanV2
ActionRebalancerCleanV2DefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerGenerateDefaultTimeout define default timeout for action ActionRebalancerGenerate
ActionRebalancerGenerateDefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebalancerGenerateV2DefaultTimeout define default timeout for action ActionRebalancerGenerateV2
ActionRebalancerGenerateV2DefaultTimeout time.Duration = ActionsDefaultTimeout
// ActionRebuildOutSyncedShardsDefaultTimeout define default timeout for action ActionRebuildOutSyncedShards
ActionRebuildOutSyncedShardsDefaultTimeout time.Duration = 86400 * time.Second // 24h0m0s
// ActionRecreateMemberDefaultTimeout define default timeout for action ActionRecreateMember
@ -266,10 +272,16 @@ const (
ActionTypePlaceHolder ActionType = "PlaceHolder"
// ActionTypeRebalancerCheck in scopes Normal. Check Rebalancer job progress
ActionTypeRebalancerCheck ActionType = "RebalancerCheck"
// ActionTypeRebalancerCheckV2 in scopes Normal. Check Rebalancer job progress
ActionTypeRebalancerCheckV2 ActionType = "RebalancerCheckV2"
// ActionTypeRebalancerClean in scopes Normal. Cleans Rebalancer jobs
ActionTypeRebalancerClean ActionType = "RebalancerClean"
// ActionTypeRebalancerCleanV2 in scopes Normal. Cleans Rebalancer jobs
ActionTypeRebalancerCleanV2 ActionType = "RebalancerCleanV2"
// ActionTypeRebalancerGenerate in scopes Normal. Generates the Rebalancer plan
ActionTypeRebalancerGenerate ActionType = "RebalancerGenerate"
// ActionTypeRebalancerGenerateV2 in scopes Normal. Generates the Rebalancer plan
ActionTypeRebalancerGenerateV2 ActionType = "RebalancerGenerateV2"
// ActionTypeRebuildOutSyncedShards in scopes High. Run Rebuild Out Synced Shards procedure for DBServers
ActionTypeRebuildOutSyncedShards ActionType = "RebuildOutSyncedShards"
// ActionTypeRecreateMember in scopes Normal. Recreate member with same ID and Data
@ -428,10 +440,16 @@ func (a ActionType) DefaultTimeout() time.Duration {
return ActionPlaceHolderDefaultTimeout
case ActionTypeRebalancerCheck:
return ActionRebalancerCheckDefaultTimeout
case ActionTypeRebalancerCheckV2:
return ActionRebalancerCheckV2DefaultTimeout
case ActionTypeRebalancerClean:
return ActionRebalancerCleanDefaultTimeout
case ActionTypeRebalancerCleanV2:
return ActionRebalancerCleanV2DefaultTimeout
case ActionTypeRebalancerGenerate:
return ActionRebalancerGenerateDefaultTimeout
case ActionTypeRebalancerGenerateV2:
return ActionRebalancerGenerateV2DefaultTimeout
case ActionTypeRebuildOutSyncedShards:
return ActionRebuildOutSyncedShardsDefaultTimeout
case ActionTypeRecreateMember:
@ -594,10 +612,16 @@ func (a ActionType) Priority() ActionPriority {
return ActionPriorityNormal
case ActionTypeRebalancerCheck:
return ActionPriorityNormal
case ActionTypeRebalancerCheckV2:
return ActionPriorityNormal
case ActionTypeRebalancerClean:
return ActionPriorityNormal
case ActionTypeRebalancerCleanV2:
return ActionPriorityNormal
case ActionTypeRebalancerGenerate:
return ActionPriorityNormal
case ActionTypeRebalancerGenerateV2:
return ActionPriorityNormal
case ActionTypeRebuildOutSyncedShards:
return ActionPriorityHigh
case ActionTypeRecreateMember:
@ -682,6 +706,8 @@ func (a ActionType) Internal() bool {
switch a {
case ActionTypeRebalancerGenerate:
return true
case ActionTypeRebalancerGenerateV2:
return true
default:
return false
}
@ -770,10 +796,16 @@ func (a ActionType) Optional() bool {
return false
case ActionTypeRebalancerCheck:
return false
case ActionTypeRebalancerCheckV2:
return false
case ActionTypeRebalancerClean:
return false
case ActionTypeRebalancerCleanV2:
return false
case ActionTypeRebalancerGenerate:
return false
case ActionTypeRebalancerGenerateV2:
return false
case ActionTypeRebuildOutSyncedShards:
return false
case ActionTypeRecreateMember:

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -20,6 +20,8 @@
package v2alpha1
const ArangoDeploymentRebalancerDefaultParallelMoves = 32
type ArangoDeploymentRebalancerSpec struct {
Enabled *bool `json:"enabled"`
@ -42,19 +44,21 @@ func (a *ArangoDeploymentRebalancerSpec) IsEnabled() bool {
return *a.Enabled
}
func (a *ArangoDeploymentRebalancerSpec) GetParallelMoves(d int) int {
func (a *ArangoDeploymentRebalancerSpec) GetParallelMoves() int {
if !a.IsEnabled() {
return d
return ArangoDeploymentRebalancerDefaultParallelMoves
}
if a == nil || a.ParallelMoves == nil {
return d
return ArangoDeploymentRebalancerDefaultParallelMoves
}
return *a.ParallelMoves
}
type ArangoDeploymentRebalancerReadersSpec struct {
// deprecated does not work in Rebalancer V2
// Count Enable Shard Count machanism
Count *bool `json:"count,omitempty"`
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -63,8 +63,15 @@ func (s SyncExternalAccessSpec) Validate() error {
return errors.WithStack(err)
}
for _, ep := range s.MasterEndpoint {
if _, err := url.Parse(ep); err != nil {
if u, err := url.Parse(ep); err != nil {
return errors.WithStack(errors.Newf("Failed to parse master endpoint '%s': %s", ep, err))
} else {
if u.Scheme != "http" && u.Scheme != "https" {
return errors.WithStack(errors.Newf("Invalid scheme '%s' in master endpoint '%s'", u.Scheme, ep))
}
if u.Host == "" {
return errors.WithStack(errors.Newf("Missing host in master endpoint '%s'", ep))
}
}
}
for _, name := range s.AccessPackageSecretNames {

View file

@ -102,3 +102,37 @@ func TestSyncSpecResetImmutableFields(t *testing.T) {
assert.Equal(t, test.Expected, test.Target)
}
}
func TestSyncSpecMasterEndpointValidate(t *testing.T) {
auth := SyncAuthenticationSpec{
JWTSecretName: util.NewType[string]("foo"),
ClientCASecretName: util.NewType[string]("foo-client"),
}
tls := TLSSpec{
CASecretName: util.NewType[string]("None"),
}
t.Run("Valid MasterEndpoint", func(t *testing.T) {
err := SyncSpec{
Authentication: auth,
TLS: tls,
ExternalAccess: SyncExternalAccessSpec{
MasterEndpoint: []string{"https://arangodb.xyz:8629"},
},
Enabled: util.NewType[bool](true),
}.Validate(DeploymentModeCluster)
assert.Nil(t, err)
})
t.Run("Invalid MasterEndpoint without protocol", func(t *testing.T) {
err := SyncSpec{
Authentication: auth,
TLS: tls,
ExternalAccess: SyncExternalAccessSpec{
MasterEndpoint: []string{"example.com:8629"},
},
Enabled: util.NewType[bool](true),
}.Validate(DeploymentModeCluster)
assert.Error(t, err)
assert.Equal(t, "Invalid scheme 'example.com' in master endpoint 'example.com:8629'", err.Error())
})
}

View file

@ -355,3 +355,14 @@ func FilterDBServerShardsNotInSync(serverID Server) ShardFilter {
return true
})
}
// GetCollectionDatabaseByID find Database name by Collection ID
func (s State) GetCollectionDatabaseByID(id string) (string, bool) {
for db, cols := range s.Current.Collections {
if _, ok := cols[id]; ok {
return db, true
}
}
return "", false
}

View file

@ -303,3 +303,16 @@ func Test_IsDBServerReadyToRestart(t *testing.T) {
})
}
}
func Test_GetCollectionDatabaseByID(t *testing.T) {
var s DumpState
require.NoError(t, json.Unmarshal(agencyDump39, &s))
v, ok := s.Agency.Arango.GetCollectionDatabaseByID("10013")
require.True(t, ok)
require.Equal(t, "_system", v)
v, ok = s.Agency.Arango.GetCollectionDatabaseByID("UNKNOWN")
require.False(t, ok)
require.Equal(t, "", v)
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -43,6 +43,7 @@ func NewClient(c driver.Connection, log logging.Logger) Client {
type Client interface {
LicenseClient
MaintenanceClient
RebalanceClient
GetTLS(ctx context.Context) (TLSDetails, error)
RefreshTLS(ctx context.Context) (TLSDetails, error)

View file

@ -0,0 +1,95 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package client
import (
"context"
"net/http"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/arangodb/kube-arangodb/pkg/util"
)
type RebalanceClient interface {
GenerateRebalanceMoves(ctx context.Context, request *RebalancePlanRequest) (RebalancePlanResponse, error)
}
type RebalancePlanRequest struct {
Version int `json:"version"`
MaximumNumberOfMoves *int `json:"maximumNumberOfMoves,omitempty"`
LeaderChanges *bool `json:"leaderChanges,omitempty"`
MoveLeaders *bool `json:"moveLeaders,omitempty"`
MoveFollowers *bool `json:"moveFollowers,omitempty"`
}
type RebalancePlanResponse struct {
Result RebalancePlanResponseResult `json:"result"`
}
type RebalancePlanResponseResult struct {
Moves RebalancePlanMoves `json:"moves"`
}
type RebalancePlanMoves []RebalancePlanMove
type RebalancePlanMove struct {
From string `json:"from"`
To string `json:"to"`
Shard string `json:"shard"`
Collection intstr.IntOrString `json:"collection"`
}
func (c *client) GenerateRebalanceMoves(ctx context.Context, request *RebalancePlanRequest) (RebalancePlanResponse, error) {
req, err := c.c.NewRequest(http.MethodPost, "/_admin/cluster/rebalance")
if err != nil {
return RebalancePlanResponse{}, err
}
request = util.InitType(request)
// Always set to 1
request.Version = 1
if r, err := req.SetBody(request); err != nil {
return RebalancePlanResponse{}, err
} else {
req = r
}
resp, err := c.c.Do(ctx, req)
if err != nil {
return RebalancePlanResponse{}, err
}
if err := resp.CheckStatus(http.StatusOK); err != nil {
return RebalancePlanResponse{}, err
}
var d RebalancePlanResponse
if err := resp.ParseBody("", &d); err != nil {
return RebalancePlanResponse{}, err
}
return d, nil
}

View file

@ -0,0 +1,37 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package features
func init() {
registerFeature(rebalancerV2)
}
var rebalancerV2 = &feature{
name: "rebalancer-v2",
description: "Rebalancer V2 feature",
version: "3.10.0",
enterpriseRequired: false,
enabledByDefault: false,
}
func RebalancerV2() Feature {
return rebalancerV2
}

View file

@ -147,12 +147,21 @@ var (
_ Action = &actionRebalancerCheck{}
_ actionFactory = newRebalancerCheckAction
_ Action = &actionRebalancerCheckV2{}
_ actionFactory = newRebalancerCheckV2Action
_ Action = &actionRebalancerClean{}
_ actionFactory = newRebalancerCleanAction
_ Action = &actionRebalancerCleanV2{}
_ actionFactory = newRebalancerCleanV2Action
_ Action = &actionRebalancerGenerate{}
_ actionFactory = newRebalancerGenerateAction
_ Action = &actionRebalancerGenerateV2{}
_ actionFactory = newRebalancerGenerateV2Action
_ Action = &actionRebuildOutSyncedShards{}
_ actionFactory = newRebuildOutSyncedShardsAction
@ -748,6 +757,18 @@ func init() {
registerAction(action, function)
}
// RebalancerCheckV2
{
// Get Action defition
function := newRebalancerCheckV2Action
action := api.ActionTypeRebalancerCheckV2
// Wrap action main function
// Register action
registerAction(action, function)
}
// RebalancerClean
{
// Get Action defition
@ -760,6 +781,18 @@ func init() {
registerAction(action, function)
}
// RebalancerCleanV2
{
// Get Action defition
function := newRebalancerCleanV2Action
action := api.ActionTypeRebalancerCleanV2
// Wrap action main function
// Register action
registerAction(action, function)
}
// RebalancerGenerate
{
// Get Action defition
@ -772,6 +805,18 @@ func init() {
registerAction(action, function)
}
// RebalancerGenerateV2
{
// Get Action defition
function := newRebalancerGenerateV2Action
action := api.ActionTypeRebalancerGenerateV2
// Wrap action main function
// Register action
registerAction(action, function)
}
// RebuildOutSyncedShards
{
// Get Action defition

View file

@ -430,6 +430,16 @@ func Test_Actions(t *testing.T) {
})
})
t.Run("RebalancerCheckV2", func(t *testing.T) {
ActionsExistence(t, api.ActionTypeRebalancerCheckV2)
t.Run("Internal", func(t *testing.T) {
require.False(t, api.ActionTypeRebalancerCheckV2.Internal())
})
t.Run("Optional", func(t *testing.T) {
require.False(t, api.ActionTypeRebalancerCheckV2.Optional())
})
})
t.Run("RebalancerClean", func(t *testing.T) {
ActionsExistence(t, api.ActionTypeRebalancerClean)
t.Run("Internal", func(t *testing.T) {
@ -440,6 +450,16 @@ func Test_Actions(t *testing.T) {
})
})
t.Run("RebalancerCleanV2", func(t *testing.T) {
ActionsExistence(t, api.ActionTypeRebalancerCleanV2)
t.Run("Internal", func(t *testing.T) {
require.False(t, api.ActionTypeRebalancerCleanV2.Internal())
})
t.Run("Optional", func(t *testing.T) {
require.False(t, api.ActionTypeRebalancerCleanV2.Optional())
})
})
t.Run("RebalancerGenerate", func(t *testing.T) {
ActionsExistence(t, api.ActionTypeRebalancerGenerate)
t.Run("Internal", func(t *testing.T) {
@ -450,6 +470,16 @@ func Test_Actions(t *testing.T) {
})
})
t.Run("RebalancerGenerateV2", func(t *testing.T) {
ActionsExistence(t, api.ActionTypeRebalancerGenerateV2)
t.Run("Internal", func(t *testing.T) {
require.True(t, api.ActionTypeRebalancerGenerateV2.Internal())
})
t.Run("Optional", func(t *testing.T) {
require.False(t, api.ActionTypeRebalancerGenerateV2.Optional())
})
})
t.Run("RebuildOutSyncedShards", func(t *testing.T) {
ActionsExistence(t, api.ActionTypeRebuildOutSyncedShards)
t.Run("Internal", func(t *testing.T) {

View file

@ -0,0 +1,102 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency/state"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
func newRebalancerCheckV2Action(action api.Action, actionCtx ActionContext) Action {
a := &actionRebalancerCheckV2{}
a.actionImpl = newActionImplDefRef(action, actionCtx)
return a
}
type actionRebalancerCheckV2 struct {
actionImpl
actionEmptyCheckProgress
}
func (r actionRebalancerCheckV2) Start(ctx context.Context) (bool, error) {
rebalancerStatus := r.actionCtx.GetStatus().Rebalancer
if rebalancerStatus == nil {
return true, nil
}
if len(rebalancerStatus.MoveJobs) == 0 {
return true, nil
}
cache, ok := r.actionCtx.GetAgencyCache()
if !ok {
r.log.Debug("AgencyCache is not ready")
return true, nil
}
statuses := make([]state.JobPhase, len(rebalancerStatus.MoveJobs))
for id := range rebalancerStatus.MoveJobs {
_, statuses[id] = cache.Target.GetJob(state.JobID(rebalancerStatus.MoveJobs[id]))
}
if err := r.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
s.Rebalancer.LastCheckTime = k8sutil.NewTime(meta.Now())
var m []string
for id := range rebalancerStatus.MoveJobs {
if statuses[id] == state.JobPhaseFailed || statuses[id] == state.JobPhaseUnknown {
r.log.Warn("Error while moving job")
r.actionCtx.Metrics().GetRebalancer().AddFailures(1)
continue
}
if statuses[id] == state.JobPhaseFinished {
r.actionCtx.Metrics().GetRebalancer().AddSuccesses(1)
continue
}
m = append(m, rebalancerStatus.MoveJobs[id])
}
s.Rebalancer.MoveJobs = m
if len(s.Rebalancer.MoveJobs) == 0 {
s.Rebalancer.LastCheckTime = nil
}
return true
}); err != nil {
r.log.Err(err).Warn("Unable to update status")
return true, nil
}
return true, nil
}

View file

@ -0,0 +1,52 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
)
func newRebalancerCleanV2Action(action api.Action, actionCtx ActionContext) Action {
a := &actionRebalancerCleanV2{}
a.actionImpl = newActionImplDefRef(action, actionCtx)
return a
}
type actionRebalancerCleanV2 struct {
actionImpl
actionEmptyCheckProgress
}
func (r actionRebalancerCleanV2) Start(ctx context.Context) (bool, error) {
return true, r.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
if s.Rebalancer != nil {
s.Rebalancer = nil
return true
}
return false
})
}

View file

@ -0,0 +1,166 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/client"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
func newRebalancerGenerateV2Action(action api.Action, actionCtx ActionContext) Action {
a := &actionRebalancerGenerateV2{}
a.actionImpl = newActionImplDefRef(action, actionCtx)
return a
}
type actionRebalancerGenerateV2 struct {
actionImpl
actionEmptyCheckProgress
}
func (r actionRebalancerGenerateV2) Start(ctx context.Context) (bool, error) {
spec := r.actionCtx.GetSpec()
if spec.Rebalancer == nil {
if err := r.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
if s.Rebalancer == nil {
return false
}
s.Rebalancer = nil
return true
}); err != nil {
r.log.Err(err).Warn("Unable to propagate changes")
return true, nil
}
return true, nil
}
c, err := r.actionCtx.GetMembersState().State().GetDatabaseClient()
if err != nil {
r.log.Err(err).Error("Unable to get client")
return true, nil
}
nctx, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
resp, err := client.NewClient(c.Connection(), r.log).GenerateRebalanceMoves(nctx, &client.RebalancePlanRequest{
MaximumNumberOfMoves: util.NewType(spec.Rebalancer.GetParallelMoves()),
})
if err != nil {
r.log.Err(err).Error("Unable to generate rebalancer moves")
return true, nil
}
if len(resp.Result.Moves) > 0 {
cache, ok := r.actionCtx.GetAgencyCache()
if !ok {
r.log.Debug("AgencyCache is not ready")
return true, nil
}
actions := make(RebalanceActions, len(resp.Result.Moves))
for id, move := range resp.Result.Moves {
db, ok := cache.GetCollectionDatabaseByID(move.Collection.String())
if !ok {
r.log.Warn("Database not found for Collection %s", move.Collection)
return true, nil
}
actions[id] = RebalanceAction{
Database: db,
Collection: move.Collection.String(),
Shard: move.Shard,
From: move.To,
To: move.From,
}
}
cluster, err := c.Cluster(ctx)
if err != nil {
r.log.Err(err).Warn("Unable to get cluster")
return true, nil
}
if err := r.executeActions(ctx, spec.Rebalancer.GetParallelMoves(), c, cluster, actions); err != nil {
r.log.Err(err).Warn("Unable to execute actions")
return true, nil
}
return true, nil
}
if err := r.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
s.Rebalancer = &api.ArangoDeploymentRebalancerStatus{
LastCheckTime: k8sutil.NewTime(meta.Now()),
}
return true
}); err != nil {
r.log.Err(err).Warn("Unable to save plan")
return true, nil
}
return true, nil
}
func (r actionRebalancerGenerateV2) executeActions(ctx context.Context, size int, client driver.Client, cluster driver.Cluster, a RebalanceActions) error {
if len(a) > size {
a = a[0:size]
}
r.actionCtx.Metrics().GetRebalancer().AddMoves(len(a))
ids, errors := runMoveJobs(ctx, client, cluster, a)
r.actionCtx.Metrics().GetRebalancer().AddFailures(len(errors))
for _, err := range errors {
r.log.Err(err).Warn("MoveShard failed")
}
if err := r.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
s.Rebalancer = &api.ArangoDeploymentRebalancerStatus{}
s.Rebalancer.MoveJobs = append(s.Rebalancer.MoveJobs, ids...)
return true
}); err != nil {
return err
}
return nil
}

View file

@ -0,0 +1,85 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"context"
"github.com/arangodb/go-driver"
)
type RebalanceActions []RebalanceAction
type RebalanceAction struct {
Database string `json:"database"`
Collection string `json:"collection"`
Shard string `json:"shard"`
From string `json:"from"`
To string `json:"to"`
DependsOn []int `json:"depends_on,omitempty"`
}
func runMoveJobs(ctx context.Context, client driver.Client, cluster driver.Cluster, a RebalanceActions) ([]string, []error) {
var errors []error
var ids []string
for _, z := range a {
id, ok, err := runMoveJob(ctx, client, cluster, z)
if err != nil {
errors = append(errors, err)
continue
}
if !ok {
continue
}
ids = append(ids, id)
}
return ids, errors
}
func runMoveJob(ctx context.Context, client driver.Client, cluster driver.Cluster, a RebalanceAction) (string, bool, error) {
if len(a.DependsOn) != 0 {
return "", false, nil
}
db, err := client.Database(ctx, a.Database)
if err != nil {
return "", false, err
}
col, err := db.Collection(ctx, a.Collection)
if err != nil {
return "", false, err
}
var jobID string
jctx := driver.WithJobIDResponse(ctx, &jobID)
if err := cluster.MoveShard(jctx, col, driver.ShardID(a.Shard), driver.ServerID(a.From), driver.ServerID(a.To)); err != nil {
return "", false, err
}
return jobID, jobID != "", nil
}

View file

@ -60,7 +60,7 @@ func (r *Reconciler) createHighPlan(ctx context.Context, apiObject k8sutil.APIOb
ApplyIfEmptyWithBackOff(LicenseCheck, 30*time.Second, r.updateClusterLicense).
ApplyIfEmpty(r.createTopologyMemberConditionPlan).
ApplyIfEmpty(r.updateMemberConditionTypeMemberVolumeUnschedulableCondition).
ApplyIfEmpty(r.createRebalancerCheckPlan).
ApplyIfEmpty(r.createRebalancerCheckPlanCore).
ApplyIfEmpty(r.createMemberFailedRestoreHighPlan).
ApplyIfEmpty(r.scaleDownCandidate).
ApplyIfEmpty(r.volumeMemberReplacement).

View file

@ -78,7 +78,7 @@ func (r *Reconciler) createNormalPlan(ctx context.Context, apiObject k8sutil.API
ApplySubPlanIfEmpty(r.createEncryptionKeyStatusPropagatedFieldUpdate, r.createEncryptionKeyCleanPlan).
ApplySubPlanIfEmpty(r.createTLSStatusPropagatedFieldUpdate, r.createCACleanPlan).
ApplyIfEmpty(r.createClusterOperationPlan).
ApplyIfEmpty(r.createRebalancerGeneratePlan).
ApplyIfEmpty(r.createRebalancerGeneratePlanCore).
// Final
ApplyIfEmpty(r.createTLSStatusPropagated).
ApplyIfEmpty(r.createBootstrapPlan))

View file

@ -0,0 +1,51 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
func (r *Reconciler) createRebalancerGeneratePlanCore(ctx context.Context, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
context PlanBuilderContext) api.Plan {
if features.RebalancerV2().ImageSupported(status.CurrentImage) {
return r.createRebalancerV2GeneratePlan(spec, status)
}
return r.createRebalancerGeneratePlan(ctx, apiObject, spec, status, context)
}
func (r *Reconciler) createRebalancerCheckPlanCore(ctx context.Context, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
context PlanBuilderContext) api.Plan {
if features.RebalancerV2().ImageSupported(status.CurrentImage) {
return r.createRebalancerV2CheckPlan(spec, status)
}
return r.createRebalancerCheckPlan(ctx, apiObject, spec, status, context)
}

View file

@ -0,0 +1,100 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
)
func (r *Reconciler) createRebalancerV2GeneratePlan(spec api.DeploymentSpec, status api.DeploymentStatus) api.Plan {
if spec.Mode.Get() != api.DeploymentModeCluster {
return nil
}
if !spec.Rebalancer.IsEnabled() {
r.metrics.Rebalancer.SetEnabled(false)
if status.Rebalancer != nil {
return api.Plan{
api.NewAction(api.ActionTypeRebalancerCleanV2, api.ServerGroupUnknown, ""),
}
}
return nil
}
r.metrics.Rebalancer.SetEnabled(true)
if !status.Members.AllMembersReady(spec.Mode.Get(), spec.Sync.IsEnabled()) {
return nil
}
if status.Rebalancer != nil {
r.metrics.Rebalancer.SetCurrent(len(status.Rebalancer.MoveJobs))
if len(status.Rebalancer.MoveJobs) != 0 {
return nil
}
if !status.Rebalancer.LastCheckTime.IsZero() {
if status.Rebalancer.LastCheckTime.Add(time.Minute).After(time.Now().UTC()) {
return nil
}
}
return api.Plan{
api.NewAction(api.ActionTypeRebalancerGenerateV2, api.ServerGroupUnknown, ""),
}
} else {
r.metrics.Rebalancer.current = 0
}
if status.Rebalancer == nil {
return api.Plan{
api.NewAction(api.ActionTypeRebalancerGenerateV2, api.ServerGroupUnknown, ""),
}
}
return nil
}
func (r *Reconciler) createRebalancerV2CheckPlan(spec api.DeploymentSpec, status api.DeploymentStatus) api.Plan {
if spec.Mode.Get() != api.DeploymentModeCluster {
return nil
}
if status.Rebalancer == nil {
return nil
}
if !status.Rebalancer.LastCheckTime.IsZero() && status.Rebalancer.LastCheckTime.Time.Add(5*time.Second).After(time.Now().UTC()) {
return nil
}
if len(status.Rebalancer.MoveJobs) == 0 {
return nil
}
return api.Plan{
// Add plan to run check
api.NewAction(api.ActionTypeRebalancerCheckV2, api.ServerGroupUnknown, ""),
}
}

View file

@ -54,3 +54,13 @@ func BoolSwitch[T interface{}](s bool, t, f T) T {
return f
}
// InitType initialise object if it is nil pointer
func InitType[T interface{}](in *T) *T {
if in != nil {
return in
}
var q T
return &q
}