1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Plan BackOff (#862)

This commit is contained in:
Adam Janikowski 2021-12-23 15:51:32 +01:00 committed by GitHub
parent 4c3a82aa41
commit 8a4f38f61f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
36 changed files with 1070 additions and 179 deletions

View file

@ -1,6 +1,7 @@
# Change Log
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- Add Plan BackOff functionality
## [1.2.6](https://github.com/arangodb/kube-arangodb/tree/1.2.6) (2021-12-15)
- Add ArangoBackup backoff functionality

View file

@ -0,0 +1,125 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"encoding/json"
"time"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var _ json.Marshaler = BackOff{}
type BackOffKey string
type BackOff map[BackOffKey]meta.Time
func (b BackOff) MarshalJSON() ([]byte, error) {
r := map[BackOffKey]meta.Time{}
for k, v := range b {
if v.IsZero() {
continue
}
r[k] = *v.DeepCopy()
}
return json.Marshal(r)
}
func (b BackOff) Process(key BackOffKey) bool {
if b == nil {
return true
}
if t, ok := b[key]; ok {
if t.IsZero() {
return true
}
return time.Now().After(t.Time)
} else {
return true
}
}
func (b BackOff) BackOff(key BackOffKey, delay time.Duration) BackOff {
n := meta.Time{Time: time.Now().Add(delay)}
z := b.DeepCopy()
if z == nil {
return BackOff{
key: n,
}
}
z[key] = n
return z
}
func (b BackOff) Combine(a BackOff) BackOff {
d := b.DeepCopy()
if d == nil {
d = BackOff{}
}
for k, v := range a {
d[k] = v
}
return d
}
func (b BackOff) CombineLatest(a BackOff) BackOff {
d := b.DeepCopy()
if d == nil {
d = BackOff{}
}
for k, v := range a {
if i, ok := d[k]; !ok || (ok && v.After(i.Time)) {
d[k] = v
}
}
return d
}
func (b BackOff) Equal(a BackOff) bool {
if len(b) == 0 && len(a) == 0 {
return true
}
if len(b) != len(a) {
return false
}
for k, v := range b {
if av, ok := a[k]; !ok || !v.Equal(&av) {
return false
}
}
return true
}

View file

@ -0,0 +1,94 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/require"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func backoffParse(t *testing.T, in BackOff) BackOff {
d, err := json.Marshal(in)
require.NoError(t, err)
var m BackOff
require.NoError(t, json.Unmarshal(d, &m))
return m
}
func Test_BackOff_Combine(t *testing.T) {
t.Run("add", func(t *testing.T) {
a := BackOff{}
b := BackOff{
"a": meta.Now(),
}
r := a.Combine(b)
require.Contains(t, r, BackOffKey("a"))
r = backoffParse(t, r)
require.Contains(t, r, BackOffKey("a"))
require.Equal(t, b["a"].Unix(), r["a"].Unix())
})
t.Run("replace", func(t *testing.T) {
a := BackOff{
"a": meta.Time{Time: time.Now().Add(-1 * time.Hour)},
}
b := BackOff{
"a": meta.Now(),
}
r := a.Combine(b)
require.Contains(t, r, BackOffKey("a"))
r = backoffParse(t, r)
require.Contains(t, r, BackOffKey("a"))
require.Equal(t, b["a"].Unix(), r["a"].Unix())
})
t.Run("delete", func(t *testing.T) {
a := BackOff{
"a": meta.Time{Time: time.Now().Add(-1 * time.Hour)},
}
b := BackOff{
"a": meta.Time{},
}
r := a.Combine(b)
require.Contains(t, r, BackOffKey("a"))
r = backoffParse(t, r)
require.NotContains(t, r, BackOffKey("a"))
})
}

View file

@ -85,6 +85,8 @@ type DeploymentStatus struct {
Topology *TopologyStatus `json:"topology,omitempty"`
Rebalancer *ArangoDeploymentRebalancerStatus `json:"rebalancer,omitempty"`
BackOff BackOff `json:"backoff,omitempty"`
}
// Equal checks for equality
@ -103,7 +105,9 @@ func (ds *DeploymentStatus) Equal(other DeploymentStatus) bool {
ds.Plan.Equal(other.Plan) &&
ds.AcceptedSpec.Equal(other.AcceptedSpec) &&
ds.SecretHashes.Equal(other.SecretHashes) &&
ds.Agency.Equal(other.Agency)
ds.Agency.Equal(other.Agency) &&
ds.Topology.Equal(other.Topology) &&
ds.BackOff.Equal(other.BackOff)
}
// IsForceReload returns true if ForceStatusReload is set to true

View file

@ -24,6 +24,20 @@ import "sort"
type List []string
func (l List) Equal(b List) bool {
if len(l) != len(b) {
return false
}
for i := range l {
if l[i] != b[i] {
return false
}
}
return true
}
func (l List) Contains(v string) bool {
for _, z := range l {
if z == v {
@ -69,6 +83,7 @@ func (l List) Remove(values ...string) List {
return m
}
func (l List) Add(values ...string) List {
var m List

View file

@ -49,7 +49,7 @@ func (a ActionType) String() string {
// Priority returns plan priority
func (a ActionType) Priority() ActionPriority {
switch a {
case ActionTypeMemberPhaseUpdate, ActionTypeMemberRIDUpdate, ActionTypeSetMemberCondition:
case ActionTypeMemberPhaseUpdate, ActionTypeMemberRIDUpdate, ActionTypeSetMemberCondition, ActionTypeSetCondition:
return ActionPriorityHigh
default:
return ActionPriorityNormal
@ -161,12 +161,16 @@ const (
ActionTypeMemberPhaseUpdate ActionType = "MemberPhaseUpdate"
// ActionTypeSetMemberCondition sets member condition. It is high priority action.
ActionTypeSetMemberCondition ActionType = "SetMemberCondition"
// ActionTypeSetCondition sets condition. It is high priority action.
ActionTypeSetCondition ActionType = "SetCondition"
// ActionTypeMemberRIDUpdate updated member Run ID (UID). High priority
ActionTypeMemberRIDUpdate ActionType = "MemberRIDUpdate"
// ActionTypeArangoMemberUpdatePodSpec updates pod spec
ActionTypeArangoMemberUpdatePodSpec ActionType = "ArangoMemberUpdatePodSpec"
// ActionTypeArangoMemberUpdatePodStatus updates pod spec
ActionTypeArangoMemberUpdatePodStatus ActionType = "ArangoMemberUpdatePodStatus"
// ActionTypeLicenseSet sets server license
ActionTypeLicenseSet ActionType = "LicenseSet"
// Runtime Updates
// ActionTypeRuntimeContainerImageUpdate updates container image in runtime

View file

@ -37,6 +37,21 @@ type TopologyStatus struct {
Label string `json:"label,omitempty"`
}
func (t *TopologyStatus) Equal(b *TopologyStatus) bool {
if t == nil && b == nil {
return true
}
if t == nil || b == nil {
return false
}
return t.ID == b.ID &&
t.Size == b.Size &&
t.Label == b.Label &&
t.Zones.Equal(b.Zones)
}
func (t *TopologyStatus) GetLeastUsedZone(group ServerGroup) int {
if t == nil {
return -1
@ -131,8 +146,46 @@ func (t *TopologyStatus) Enabled() bool {
type TopologyStatusZones []TopologyStatusZone
func (in TopologyStatusZones) Equal(zones TopologyStatusZones) bool {
if len(in) == 0 && len(zones) == 0 {
return true
}
if len(in) != len(zones) {
return false
}
for id := range in {
if !in[id].Equal(&zones[id]) {
return false
}
}
return true
}
type TopologyStatusZoneMembers map[string]List
func (in TopologyStatusZoneMembers) Equal(members TopologyStatusZoneMembers) bool {
if len(in) == 0 && len(members) == 0 {
return true
}
if len(in) != len(members) {
return false
}
for k, v := range in {
mv, ok := members[k]
if !ok {
return false
}
if !v.Equal(mv) {
return false
}
}
return true
}
type TopologyStatusZone struct {
ID int `json:"id"`
@ -175,6 +228,18 @@ func (t *TopologyStatusZone) Get(group ServerGroup) List {
}
}
func (t *TopologyStatusZone) Equal(b *TopologyStatusZone) bool {
if t == nil && b == nil {
return true
}
if t == nil || b == nil {
return false
}
return t.ID == b.ID &&
t.Labels.Equal(b.Labels) &&
t.Members.Equal(b.Members)
}
func NewTopologyStatus(spec *TopologySpec) *TopologyStatus {
if spec == nil {
return nil

View file

@ -398,6 +398,28 @@ func (in *AuthenticationSpec) DeepCopy() *AuthenticationSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in BackOff) DeepCopyInto(out *BackOff) {
{
in := &in
*out = make(BackOff, len(*in))
for key, val := range *in {
(*out)[key] = *val.DeepCopy()
}
return
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackOff.
func (in BackOff) DeepCopy() BackOff {
if in == nil {
return nil
}
out := new(BackOff)
in.DeepCopyInto(out)
return *out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BootstrapSpec) DeepCopyInto(out *BootstrapSpec) {
*out = *in
@ -807,6 +829,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) {
*out = new(ArangoDeploymentRebalancerStatus)
(*in).DeepCopyInto(*out)
}
if in.BackOff != nil {
in, out := &in.BackOff, &out.BackOff
*out = make(BackOff, len(*in))
for key, val := range *in {
(*out)[key] = *val.DeepCopy()
}
}
return
}

View file

@ -0,0 +1,125 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2alpha1
import (
"encoding/json"
"time"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var _ json.Marshaler = BackOff{}
type BackOffKey string
type BackOff map[BackOffKey]meta.Time
func (b BackOff) MarshalJSON() ([]byte, error) {
r := map[BackOffKey]meta.Time{}
for k, v := range b {
if v.IsZero() {
continue
}
r[k] = *v.DeepCopy()
}
return json.Marshal(r)
}
func (b BackOff) Process(key BackOffKey) bool {
if b == nil {
return true
}
if t, ok := b[key]; ok {
if t.IsZero() {
return true
}
return time.Now().After(t.Time)
} else {
return true
}
}
func (b BackOff) BackOff(key BackOffKey, delay time.Duration) BackOff {
n := meta.Time{Time: time.Now().Add(delay)}
z := b.DeepCopy()
if z == nil {
return BackOff{
key: n,
}
}
z[key] = n
return z
}
func (b BackOff) Combine(a BackOff) BackOff {
d := b.DeepCopy()
if d == nil {
d = BackOff{}
}
for k, v := range a {
d[k] = v
}
return d
}
func (b BackOff) CombineLatest(a BackOff) BackOff {
d := b.DeepCopy()
if d == nil {
d = BackOff{}
}
for k, v := range a {
if i, ok := d[k]; !ok || (ok && v.After(i.Time)) {
d[k] = v
}
}
return d
}
func (b BackOff) Equal(a BackOff) bool {
if len(b) == 0 && len(a) == 0 {
return true
}
if len(b) != len(a) {
return false
}
for k, v := range b {
if av, ok := a[k]; !ok || !v.Equal(&av) {
return false
}
}
return true
}

View file

@ -0,0 +1,94 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2alpha1
import (
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/require"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func backoffParse(t *testing.T, in BackOff) BackOff {
d, err := json.Marshal(in)
require.NoError(t, err)
var m BackOff
require.NoError(t, json.Unmarshal(d, &m))
return m
}
func Test_BackOff_Combine(t *testing.T) {
t.Run("add", func(t *testing.T) {
a := BackOff{}
b := BackOff{
"a": meta.Now(),
}
r := a.Combine(b)
require.Contains(t, r, BackOffKey("a"))
r = backoffParse(t, r)
require.Contains(t, r, BackOffKey("a"))
require.Equal(t, b["a"].Unix(), r["a"].Unix())
})
t.Run("replace", func(t *testing.T) {
a := BackOff{
"a": meta.Time{Time: time.Now().Add(-1 * time.Hour)},
}
b := BackOff{
"a": meta.Now(),
}
r := a.Combine(b)
require.Contains(t, r, BackOffKey("a"))
r = backoffParse(t, r)
require.Contains(t, r, BackOffKey("a"))
require.Equal(t, b["a"].Unix(), r["a"].Unix())
})
t.Run("delete", func(t *testing.T) {
a := BackOff{
"a": meta.Time{Time: time.Now().Add(-1 * time.Hour)},
}
b := BackOff{
"a": meta.Time{},
}
r := a.Combine(b)
require.Contains(t, r, BackOffKey("a"))
r = backoffParse(t, r)
require.NotContains(t, r, BackOffKey("a"))
})
}

View file

@ -85,6 +85,8 @@ type DeploymentStatus struct {
Topology *TopologyStatus `json:"topology,omitempty"`
Rebalancer *ArangoDeploymentRebalancerStatus `json:"rebalancer,omitempty"`
BackOff BackOff `json:"backoff,omitempty"`
}
// Equal checks for equality
@ -103,7 +105,9 @@ func (ds *DeploymentStatus) Equal(other DeploymentStatus) bool {
ds.Plan.Equal(other.Plan) &&
ds.AcceptedSpec.Equal(other.AcceptedSpec) &&
ds.SecretHashes.Equal(other.SecretHashes) &&
ds.Agency.Equal(other.Agency)
ds.Agency.Equal(other.Agency) &&
ds.Topology.Equal(other.Topology) &&
ds.BackOff.Equal(other.BackOff)
}
// IsForceReload returns true if ForceStatusReload is set to true

View file

@ -24,6 +24,20 @@ import "sort"
type List []string
func (l List) Equal(b List) bool {
if len(l) != len(b) {
return false
}
for i := range l {
if l[i] != b[i] {
return false
}
}
return true
}
func (l List) Contains(v string) bool {
for _, z := range l {
if z == v {
@ -69,6 +83,7 @@ func (l List) Remove(values ...string) List {
return m
}
func (l List) Add(values ...string) List {
var m List

View file

@ -49,7 +49,7 @@ func (a ActionType) String() string {
// Priority returns plan priority
func (a ActionType) Priority() ActionPriority {
switch a {
case ActionTypeMemberPhaseUpdate, ActionTypeMemberRIDUpdate, ActionTypeSetMemberCondition:
case ActionTypeMemberPhaseUpdate, ActionTypeMemberRIDUpdate, ActionTypeSetMemberCondition, ActionTypeSetCondition:
return ActionPriorityHigh
default:
return ActionPriorityNormal
@ -161,12 +161,16 @@ const (
ActionTypeMemberPhaseUpdate ActionType = "MemberPhaseUpdate"
// ActionTypeSetMemberCondition sets member condition. It is high priority action.
ActionTypeSetMemberCondition ActionType = "SetMemberCondition"
// ActionTypeSetCondition sets condition. It is high priority action.
ActionTypeSetCondition ActionType = "SetCondition"
// ActionTypeMemberRIDUpdate updated member Run ID (UID). High priority
ActionTypeMemberRIDUpdate ActionType = "MemberRIDUpdate"
// ActionTypeArangoMemberUpdatePodSpec updates pod spec
ActionTypeArangoMemberUpdatePodSpec ActionType = "ArangoMemberUpdatePodSpec"
// ActionTypeArangoMemberUpdatePodStatus updates pod spec
ActionTypeArangoMemberUpdatePodStatus ActionType = "ArangoMemberUpdatePodStatus"
// ActionTypeLicenseSet sets server license
ActionTypeLicenseSet ActionType = "LicenseSet"
// Runtime Updates
// ActionTypeRuntimeContainerImageUpdate updates container image in runtime

View file

@ -37,6 +37,21 @@ type TopologyStatus struct {
Label string `json:"label,omitempty"`
}
func (t *TopologyStatus) Equal(b *TopologyStatus) bool {
if t == nil && b == nil {
return true
}
if t == nil || b == nil {
return false
}
return t.ID == b.ID &&
t.Size == b.Size &&
t.Label == b.Label &&
t.Zones.Equal(b.Zones)
}
func (t *TopologyStatus) GetLeastUsedZone(group ServerGroup) int {
if t == nil {
return -1
@ -131,8 +146,46 @@ func (t *TopologyStatus) Enabled() bool {
type TopologyStatusZones []TopologyStatusZone
func (in TopologyStatusZones) Equal(zones TopologyStatusZones) bool {
if len(in) == 0 && len(zones) == 0 {
return true
}
if len(in) != len(zones) {
return false
}
for id := range in {
if !in[id].Equal(&zones[id]) {
return false
}
}
return true
}
type TopologyStatusZoneMembers map[string]List
func (in TopologyStatusZoneMembers) Equal(members TopologyStatusZoneMembers) bool {
if len(in) == 0 && len(members) == 0 {
return true
}
if len(in) != len(members) {
return false
}
for k, v := range in {
mv, ok := members[k]
if !ok {
return false
}
if !v.Equal(mv) {
return false
}
}
return true
}
type TopologyStatusZone struct {
ID int `json:"id"`
@ -175,6 +228,18 @@ func (t *TopologyStatusZone) Get(group ServerGroup) List {
}
}
func (t *TopologyStatusZone) Equal(b *TopologyStatusZone) bool {
if t == nil && b == nil {
return true
}
if t == nil || b == nil {
return false
}
return t.ID == b.ID &&
t.Labels.Equal(b.Labels) &&
t.Members.Equal(b.Members)
}
func NewTopologyStatus(spec *TopologySpec) *TopologyStatus {
if spec == nil {
return nil

View file

@ -398,6 +398,28 @@ func (in *AuthenticationSpec) DeepCopy() *AuthenticationSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in BackOff) DeepCopyInto(out *BackOff) {
{
in := &in
*out = make(BackOff, len(*in))
for key, val := range *in {
(*out)[key] = *val.DeepCopy()
}
return
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BackOff.
func (in BackOff) DeepCopy() BackOff {
if in == nil {
return nil
}
out := new(BackOff)
in.DeepCopyInto(out)
return *out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *BootstrapSpec) DeepCopyInto(out *BootstrapSpec) {
*out = *in
@ -807,6 +829,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) {
*out = new(ArangoDeploymentRebalancerStatus)
(*in).DeepCopyInto(*out)
}
if in.BackOff != nil {
in, out := &in.BackOff, &out.BackOff
*out = make(BackOff, len(*in))
for key, val := range *in {
(*out)[key] = *val.DeepCopy()
}
}
return
}

View file

@ -148,15 +148,12 @@ func (d *Deployment) GetDeploymentHealth() (driver.ClusterHealth, error) {
// GetStatus returns the current status of the deployment
// together with the current version of that status.
func (d *Deployment) GetStatus() (api.DeploymentStatus, int32) {
d.status.mutex.Lock()
defer d.status.mutex.Unlock()
return d.getStatus()
}
func (d *Deployment) getStatus() (api.DeploymentStatus, int32) {
version := d.status.version
return *d.status.last.DeepCopy(), version
obj := d.status.deploymentStatusObject
return *obj.last.DeepCopy(), obj.version
}
// UpdateStatus replaces the status of the deployment with the given status and
@ -179,8 +176,11 @@ func (d *Deployment) updateStatus(ctx context.Context, status api.DeploymentStat
Msg("UpdateStatus version conflict error.")
return errors.WithStack(errors.Newf("Status conflict error. Expected version %d, got %d", lastVersion, d.status.version))
}
d.status.version++
d.status.last = *status.DeepCopy()
d.status.deploymentStatusObject = deploymentStatusObject{
version: d.status.deploymentStatusObject.version + 1,
last: *status.DeepCopy(),
}
if err := d.updateCRStatus(ctx, force...); err != nil {
return errors.WithStack(err)
}
@ -735,3 +735,17 @@ func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...pa
return nil
}
func (d *Deployment) GenerateMemberEndpoint(group api.ServerGroup, member api.MemberStatus) (string, error) {
cache := d.GetCachedStatus()
return pod.GenerateMemberEndpoint(cache, d.GetAPIObject(), d.GetSpec(), group, member)
}
func (d *Deployment) GetStatusSnapshot() api.DeploymentStatus {
s, _ := d.GetStatus()
z := s.DeepCopy()
return *z
}

View file

@ -109,6 +109,11 @@ const (
maxInspectionInterval = 10 * util.Interval(time.Second) // Ensure we inspect the generated resources no less than with this interval
)
type deploymentStatusObject struct {
version int32
last api.DeploymentStatus // Internal status copy of the CR
}
// Deployment is the in process state of an ArangoDeployment.
type Deployment struct {
name string
@ -116,9 +121,8 @@ type Deployment struct {
apiObject *api.ArangoDeployment // API object
status struct {
mutex sync.Mutex
version int32
last api.DeploymentStatus // Internal status copy of the CR
mutex sync.Mutex
deploymentStatusObject
}
config Config
deps Dependencies

View file

@ -268,6 +268,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
return minInspectionInterval, errors.Wrapf(err, "Unable clean plan")
}
} else if err, updated := d.reconciler.CreatePlan(ctx, cachedStatus); err != nil {
d.deps.Log.Info().Msgf("Plan generated, reconciling")
return minInspectionInterval, errors.Wrapf(err, "Plan creation failed")
} else if updated {
return minInspectionInterval, nil

View file

@ -65,7 +65,7 @@ type actionArangoMemberUpdatePodSpec struct {
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionArangoMemberUpdatePodSpec) Start(ctx context.Context) (bool, error) {
spec := a.actionCtx.GetSpec()
status := a.actionCtx.GetStatus()
status := a.actionCtx.GetStatusSnapshot()
m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !found {

View file

@ -56,7 +56,7 @@ type actionBackupRestore struct {
func (a actionBackupRestore) Start(ctx context.Context) (bool, error) {
spec := a.actionCtx.GetSpec()
status := a.actionCtx.GetStatus()
status := a.actionCtx.GetStatusSnapshot()
if spec.RestoreFrom == nil {
return true, nil

View file

@ -67,9 +67,8 @@ type ActionContext interface {
resources.DeploymentModInterfaces
resources.DeploymentCachedStatus
resources.ArangoAgencyGet
resources.DeploymentInfoGetter
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
// Gets the specified mode of deployment
GetMode() api.DeploymentMode
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
@ -140,10 +139,6 @@ type ActionContext interface {
GetShardSyncStatus() bool
// InvalidateSyncStatus resets the sync state to false and triggers an inspection
InvalidateSyncStatus()
// GetSpec returns a copy of the spec
GetSpec() api.DeploymentSpec
// GetStatus returns a copy of the status
GetStatus() api.DeploymentStatus
// DisableScalingCluster disables scaling DBservers and coordinators
DisableScalingCluster(ctx context.Context) error
// EnableScalingCluster enables scaling DBservers and coordinators
@ -174,6 +169,18 @@ type actionContext struct {
cachedStatus inspectorInterface.Inspector
}
func (ac *actionContext) GetStatus() (api.DeploymentStatus, int32) {
return ac.context.GetStatus()
}
func (ac *actionContext) GetStatusSnapshot() api.DeploymentStatus {
return ac.context.GetStatusSnapshot()
}
func (ac *actionContext) GenerateMemberEndpoint(group api.ServerGroup, member api.MemberStatus) (string, error) {
return ac.context.GenerateMemberEndpoint(group, member)
}
func (ac *actionContext) GetAgencyCache() (agencyCache.State, bool) {
return ac.context.GetAgencyCache()
}
@ -218,14 +225,6 @@ func (ac *actionContext) GetName() string {
return ac.context.GetName()
}
func (ac *actionContext) GetStatus() api.DeploymentStatus {
a, _ := ac.context.GetStatus()
s := a.DeepCopy()
return *s
}
func (ac *actionContext) GetBackup(ctx context.Context, backup string) (*backupApi.ArangoBackup, error) {
return ac.context.GetBackup(ctx, backup)
}

View file

@ -51,7 +51,7 @@ type jwtRefreshAction struct {
}
func (a *jwtRefreshAction) CheckProgress(ctx context.Context) (bool, bool, error) {
if folder, err := ensureJWTFolderSupport(a.actionCtx.GetSpec(), a.actionCtx.GetStatus()); err != nil || !folder {
if folder, err := ensureJWTFolderSupport(a.actionCtx.GetSpec(), a.actionCtx.GetStatusSnapshot()); err != nil || !folder {
return true, false, nil
}

View file

@ -48,7 +48,7 @@ const (
)
func ensureJWTFolderSupportFromAction(actionCtx ActionContext) (bool, error) {
return ensureJWTFolderSupport(actionCtx.GetSpec(), actionCtx.GetStatus())
return ensureJWTFolderSupport(actionCtx.GetSpec(), actionCtx.GetStatusSnapshot())
}
func ensureJWTFolderSupport(spec api.DeploymentSpec, status api.DeploymentStatus) (bool, error) {

View file

@ -0,0 +1,90 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Tomasz Mielech
//
package reconcile
import (
"context"
"strconv"
"github.com/rs/zerolog"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
)
func init() {
registerAction(api.ActionTypeSetCondition, setCondition)
}
func setCondition(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &actionSetCondition{}
a.actionImpl = newActionImplDefRef(log, action, actionCtx, defaultTimeout)
return a
}
type actionSetCondition struct {
// actionImpl implement timeout and member id functions
actionImpl
actionEmptyCheckProgress
}
// Start starts the action for changing conditions on the provided member.
func (a actionSetCondition) Start(ctx context.Context) (bool, error) {
if len(a.action.Params) == 0 {
a.log.Info().Msg("can not start the action with the empty list of conditions")
return true, nil
}
if err := a.actionCtx.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
changed := false
for condition, value := range a.action.Params {
if value == "" {
a.log.Debug().Msg("remove the condition")
if s.Conditions.Remove(api.ConditionType(condition)) {
changed = true
}
} else {
set, err := strconv.ParseBool(value)
if err != nil {
a.log.Error().Err(err).Str("value", value).Msg("can not parse string to boolean")
continue
}
a.log.Debug().Msg("set the condition")
if s.Conditions.Update(api.ConditionType(condition), set, a.action.Reason, "action set the member condition") {
changed = true
}
}
}
return changed
}); err != nil {
a.log.Warn().Err(err).Msgf("Unable to set condition")
return true, nil
}
return true, nil
}

View file

@ -51,13 +51,9 @@ type Context interface {
resources.DeploymentModInterfaces
resources.DeploymentCachedStatus
resources.ArangoAgencyGet
resources.ArangoApplier
resources.DeploymentInfoGetter
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment
GetStatus() (api.DeploymentStatus, int32)
// UpdateStatus replaces the status of the deployment with the given status and
// updates the resources in k8s.
UpdateStatus(ctx context.Context, status api.DeploymentStatus, lastVersion int32, force ...bool) error

View file

@ -25,6 +25,8 @@ package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
)
@ -33,23 +35,13 @@ const (
reconciliationComponent = "deployment_reconciliation"
)
const (
BackOffCheck api.BackOffKey = "check"
)
// CreatePlan considers the current specification & status of the deployment creates a plan to
// get the status in line with the specification.
// If a plan already exists, nothing is done.
func (d *Reconciler) CreatePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
var updated bool
if err, u := d.CreateHighPlan(ctx, cachedStatus); err != nil {
return err, false
} else if u {
updated = true
}
if err, u := d.CreateNormalPlan(ctx, cachedStatus); err != nil {
return err, false
} else if u {
updated = true
}
return nil, updated
return d.generatePlan(ctx, cachedStatus, d.generatePlanFunc(createHighPlan, plannerHigh{}), d.generatePlanFunc(createNormalPlan, plannerNormal{}))
}

View file

@ -23,14 +23,17 @@
package reconcile
import (
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/rs/zerolog"
)
func newPlanAppender(pb WithPlanBuilder, current api.Plan) PlanAppender {
return planAppenderType{
func newPlanAppender(pb WithPlanBuilder, backoff api.BackOff, current api.Plan) PlanAppender {
return &planAppenderType{
current: current,
pb: pb,
backoff: backoff.DeepCopy(),
}
}
@ -50,6 +53,10 @@ type PlanAppender interface {
ApplyWithConditionIfEmpty(c planBuilderCondition, pb planBuilder) PlanAppender
ApplySubPlanIfEmpty(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender
ApplyWithBackOff(key api.BackOffKey, delay time.Duration, pb planBuilder) PlanAppender
BackOff() api.BackOff
Plan() api.Plan
}
@ -58,6 +65,16 @@ type planAppenderRecovery struct {
log zerolog.Logger
}
func (p planAppenderRecovery) BackOff() api.BackOff {
return p.appender.BackOff()
}
func (p planAppenderRecovery) ApplyWithBackOff(key api.BackOffKey, delay time.Duration, pb planBuilder) PlanAppender {
return p.create(func(in PlanAppender) PlanAppender {
return in.ApplyWithBackOff(key, delay, pb)
})
}
func (p planAppenderRecovery) create(ret func(in PlanAppender) PlanAppender) (r PlanAppender) {
defer func() {
if e := recover(); e != nil {
@ -115,48 +132,65 @@ func (p planAppenderRecovery) Plan() api.Plan {
type planAppenderType struct {
pb WithPlanBuilder
current api.Plan
backoff api.BackOff
}
func (p planAppenderType) Plan() api.Plan {
func (p *planAppenderType) BackOff() api.BackOff {
return p.backoff.DeepCopy()
}
func (p *planAppenderType) ApplyWithBackOff(key api.BackOffKey, delay time.Duration, pb planBuilder) PlanAppender {
if !p.backoff.Process(key) {
return p
}
p.backoff = p.backoff.BackOff(key, delay)
return p.Apply(pb)
}
func (p *planAppenderType) Plan() api.Plan {
return p.current
}
func (p planAppenderType) ApplyIfEmpty(pb planBuilder) PlanAppender {
func (p *planAppenderType) ApplyIfEmpty(pb planBuilder) PlanAppender {
if p.current.IsEmpty() {
return p.Apply(pb)
}
return p
}
func (p planAppenderType) ApplyWithConditionIfEmpty(c planBuilderCondition, pb planBuilder) PlanAppender {
func (p *planAppenderType) ApplyWithConditionIfEmpty(c planBuilderCondition, pb planBuilder) PlanAppender {
if p.current.IsEmpty() {
return p.ApplyWithCondition(c, pb)
}
return p
}
func (p planAppenderType) ApplySubPlanIfEmpty(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender {
func (p *planAppenderType) ApplySubPlanIfEmpty(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender {
if p.current.IsEmpty() {
return p.ApplySubPlan(pb, plans...)
}
return p
}
func (p planAppenderType) new(plan api.Plan) planAppenderType {
return planAppenderType{
func (p *planAppenderType) new(plan api.Plan) *planAppenderType {
return &planAppenderType{
pb: p.pb,
current: append(p.current, plan...),
backoff: p.backoff.DeepCopy(),
}
}
func (p planAppenderType) Apply(pb planBuilder) PlanAppender {
func (p *planAppenderType) Apply(pb planBuilder) PlanAppender {
return p.new(p.pb.Apply(pb))
}
func (p planAppenderType) ApplyWithCondition(c planBuilderCondition, pb planBuilder) PlanAppender {
func (p *planAppenderType) ApplyWithCondition(c planBuilderCondition, pb planBuilder) PlanAppender {
return p.new(p.pb.ApplyWithCondition(c, pb))
}
func (p planAppenderType) ApplySubPlan(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender {
func (p *planAppenderType) ApplySubPlan(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender {
return p.new(p.pb.ApplySubPlan(pb, plans...))
}

View file

@ -34,7 +34,7 @@ import (
func Test_PlanBuilderAppender_Recovery(t *testing.T) {
t.Run("Recover", func(t *testing.T) {
require.Len(t, recoverPlanAppender(log.Logger, newPlanAppender(NewWithPlanBuilder(context.Background(), zerolog.Logger{}, nil, api.DeploymentSpec{}, api.DeploymentStatus{}, nil, nil), nil)).
require.Len(t, recoverPlanAppender(log.Logger, newPlanAppender(NewWithPlanBuilder(context.Background(), zerolog.Logger{}, nil, api.DeploymentSpec{}, api.DeploymentStatus{}, nil, nil), nil, nil)).
Apply(func(_ context.Context, _ zerolog.Logger, _ k8sutil.APIObject, _ api.DeploymentSpec, _ api.DeploymentStatus, _ inspectorInterface.Inspector, _ PlanBuilderContext) api.Plan {
panic("")
}).
@ -43,7 +43,7 @@ func Test_PlanBuilderAppender_Recovery(t *testing.T) {
}).Plan(), 0)
})
t.Run("Recover with output", func(t *testing.T) {
require.Len(t, recoverPlanAppender(log.Logger, newPlanAppender(NewWithPlanBuilder(context.Background(), zerolog.Logger{}, nil, api.DeploymentSpec{}, api.DeploymentStatus{}, nil, nil), nil)).
require.Len(t, recoverPlanAppender(log.Logger, newPlanAppender(NewWithPlanBuilder(context.Background(), zerolog.Logger{}, nil, api.DeploymentSpec{}, api.DeploymentStatus{}, nil, nil), nil, nil)).
Apply(func(_ context.Context, _ zerolog.Logger, _ k8sutil.APIObject, _ api.DeploymentSpec, _ api.DeploymentStatus, _ inspectorInterface.Inspector, _ PlanBuilderContext) api.Plan {
return api.Plan{api.Action{}}
}).
@ -55,7 +55,7 @@ func Test_PlanBuilderAppender_Recovery(t *testing.T) {
}).Plan(), 1)
})
t.Run("Recover with multi", func(t *testing.T) {
require.Len(t, recoverPlanAppender(log.Logger, newPlanAppender(NewWithPlanBuilder(context.Background(), zerolog.Logger{}, nil, api.DeploymentSpec{}, api.DeploymentStatus{}, nil, nil), nil)).
require.Len(t, recoverPlanAppender(log.Logger, newPlanAppender(NewWithPlanBuilder(context.Background(), zerolog.Logger{}, nil, api.DeploymentSpec{}, api.DeploymentStatus{}, nil, nil), nil, nil)).
Apply(func(_ context.Context, _ zerolog.Logger, _ k8sutil.APIObject, _ api.DeploymentSpec, _ api.DeploymentStatus, _ inspectorInterface.Inspector, _ PlanBuilderContext) api.Plan {
return api.Plan{api.Action{}}
}).

View file

@ -41,7 +41,6 @@ import (
// PlanBuilderContext contains context methods provided to plan builders.
type PlanBuilderContext interface {
resources.DeploymentStatusUpdate
resources.DeploymentAgencyMaintenance
resources.ArangoMemberContext
resources.DeploymentPodRenderer

View file

@ -0,0 +1,116 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package reconcile
import (
"context"
"strings"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
type planGenerationOutput struct {
plan api.Plan
backoff api.BackOff
changed bool
planner planner
}
type planGeneratorFunc func(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector,
builderCtx PlanBuilderContext) (api.Plan, api.BackOff, bool)
type planGenerator func(ctx context.Context, cachedStatus inspectorInterface.Inspector) planGenerationOutput
func (d *Reconciler) generatePlanFunc(gen planGeneratorFunc, planner planner) planGenerator {
return func(ctx context.Context, cachedStatus inspectorInterface.Inspector) planGenerationOutput {
// Create plan
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, _ := d.context.GetStatus()
builderCtx := newPlanBuilderContext(d.context)
newPlan, backoffs, changed := gen(ctx, d.log, apiObject, planner.Get(&status), spec, status, cachedStatus, builderCtx)
return planGenerationOutput{
plan: newPlan,
backoff: backoffs,
planner: planner,
changed: changed,
}
}
}
func (d *Reconciler) generatePlan(ctx context.Context, cachedStatus inspectorInterface.Inspector, generators ...planGenerator) (error, bool) {
updated := false
if err := d.context.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
var b api.BackOff
for id := range generators {
result := generators[id](ctx, cachedStatus)
b = b.CombineLatest(result.backoff)
if len(result.plan) == 0 || !result.changed {
continue
}
// Send events
current := result.planner.Get(s)
for id := len(current); id < len(result.plan); id++ {
action := result.plan[id]
d.context.CreateEvent(k8sutil.NewPlanAppendEvent(d.context.GetAPIObject(), action.Type.String(), action.Group.AsRole(), action.MemberID, action.Reason))
if r := action.Reason; r != "" {
d.log.Info().Str("Action", action.Type.String()).Str("Role", action.Group.AsRole()).Str("Member", action.MemberID).Str("Type", strings.Title(result.planner.Type())).Msgf(r)
}
}
result.planner.Set(s, result.plan)
for _, p := range result.plan {
actionsGeneratedMetrics.WithLabelValues(d.context.GetName(), p.Type.String(), result.planner.Type()).Inc()
}
updated = true
}
if len(b) > 0 {
new := s.BackOff.DeepCopy().Combine(b)
if !new.Equal(s.BackOff) {
s.BackOff = new
updated = true
}
}
return updated
}); err != nil {
return errors.WithMessage(err, "Unable to save plan"), false
}
return nil, updated
}

View file

@ -24,70 +24,30 @@ package reconcile
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/deployment/rotation"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
core "k8s.io/api/core/v1"
)
func (d *Reconciler) CreateHighPlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
// Create plan
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, lastVersion := d.context.GetStatus()
builderCtx := newPlanBuilderContext(d.context)
newPlan, changed := createHighPlan(ctx, d.log, apiObject, status.HighPriorityPlan, spec, status, cachedStatus, builderCtx)
// If not change, we're done
if !changed {
return nil, false
}
// Save plan
if len(newPlan) == 0 {
// Nothing to do
return nil, false
}
// Send events
for id := len(status.Plan); id < len(newPlan); id++ {
action := newPlan[id]
d.context.CreateEvent(k8sutil.NewPlanAppendEvent(apiObject, action.Type.String(), action.Group.AsRole(), action.MemberID, action.Reason))
if r := action.Reason; r != "" {
d.log.Info().Str("Action", action.Type.String()).Str("Role", action.Group.AsRole()).Str("Member", action.MemberID).Str("Type", "High").Msgf(r)
}
}
status.HighPriorityPlan = newPlan
for _, p := range newPlan {
actionsGeneratedMetrics.WithLabelValues(d.context.GetName(), p.Type.String(), "high").Inc()
}
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
return errors.WithStack(err), false
}
return nil, true
}
// createHighPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createHighPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector,
builderCtx PlanBuilderContext) (api.Plan, bool) {
builderCtx PlanBuilderContext) (api.Plan, api.BackOff, bool) {
if !currentPlan.IsEmpty() {
// Plan already exists, complete that first
return currentPlan, false
return currentPlan, nil, false
}
return recoverPlanAppender(log, newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), currentPlan).
r := recoverPlanAppender(log, newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), status.BackOff, currentPlan).
ApplyIfEmpty(updateMemberPodTemplateSpec).
ApplyIfEmpty(updateMemberPhasePlan).
ApplyIfEmpty(createCleanOutPlan).
@ -95,8 +55,10 @@ func createHighPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.A
ApplyIfEmpty(updateMemberRotationConditionsPlan).
ApplyIfEmpty(createTopologyMemberUpdatePlan).
ApplyIfEmpty(createTopologyMemberConditionPlan).
ApplyIfEmpty(createRebalancerCheckPlan)).
Plan(), true
ApplyIfEmpty(createRebalancerCheckPlan).
ApplyWithBackOff(BackOffCheck, time.Minute, emptyPlanBuilder))
return r.Plan(), r.BackOff(), true
}
// updateMemberPodTemplateSpec creates plan to update member Spec

View file

@ -26,65 +26,24 @@ import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
func (d *Reconciler) CreateNormalPlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
// Create plan
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, lastVersion := d.context.GetStatus()
builderCtx := newPlanBuilderContext(d.context)
newPlan, changed := createNormalPlan(ctx, d.log, apiObject, status.Plan, spec, status, cachedStatus, builderCtx)
// If not change, we're done
if !changed {
return nil, false
}
// Save plan
if len(newPlan) == 0 {
// Nothing to do
return nil, false
}
// Send events
for id := len(status.Plan); id < len(newPlan); id++ {
action := newPlan[id]
d.context.CreateEvent(k8sutil.NewPlanAppendEvent(apiObject, action.Type.String(), action.Group.AsRole(), action.MemberID, action.Reason))
if r := action.Reason; r != "" {
d.log.Info().Str("Action", action.Type.String()).Str("Role", action.Group.AsRole()).Str("Member", action.MemberID).Str("Type", "Normal").Msgf(r)
}
}
status.Plan = newPlan
for _, p := range newPlan {
actionsGeneratedMetrics.WithLabelValues(d.context.GetName(), p.Type.String(), "normal").Inc()
}
if err := d.context.UpdateStatus(ctx, status, lastVersion); err != nil {
return errors.WithStack(err), false
}
return nil, true
}
// createNormalPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createNormalPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector,
builderCtx PlanBuilderContext) (api.Plan, bool) {
builderCtx PlanBuilderContext) (api.Plan, api.BackOff, bool) {
if !currentPlan.IsEmpty() {
// Plan already exists, complete that first
return currentPlan, false
return currentPlan, nil, false
}
return recoverPlanAppender(log, newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), currentPlan).
r := recoverPlanAppender(log, newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), status.BackOff, currentPlan).
// Adjust topology settings
ApplyIfEmpty(createTopologyMemberAdjustmentPlan).
// Define topology
@ -124,8 +83,9 @@ func createNormalPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil
ApplyIfEmpty(createRebalancerGeneratePlan).
// Final
ApplyIfEmpty(createTLSStatusPropagated).
ApplyIfEmpty(createBootstrapPlan)).
Plan(), true
ApplyIfEmpty(createBootstrapPlan))
return r.Plan(), r.BackOff(), true
}
func createMemberFailedRestorePlan(ctx context.Context,

View file

@ -29,6 +29,9 @@ import (
"io/ioutil"
"testing"
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
pod2 "github.com/arangodb/kube-arangodb/pkg/deployment/pod"
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
@ -89,6 +92,25 @@ type testContext struct {
PVC *core.PersistentVolumeClaim
PVCErr error
RecordedEvent *k8sutil.Event
Inspector inspectorInterface.Inspector
}
func (c *testContext) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error {
panic("implement me")
}
func (c *testContext) ApplyPatch(ctx context.Context, p ...patch.Item) error {
panic("implement me")
}
func (c *testContext) GetStatusSnapshot() api.DeploymentStatus {
s, _ := c.GetStatus()
return *s.DeepCopy()
}
func (c *testContext) GenerateMemberEndpoint(group api.ServerGroup, member api.MemberStatus) (string, error) {
return pod2.GenerateMemberEndpoint(c.Inspector, c.ArangoDeployment, c.ArangoDeployment.Spec, group, member)
}
func (c *testContext) GetAgencyCache() (agencyCache.State, bool) {
@ -132,7 +154,8 @@ func (c *testContext) GetCachedStatus() inspectorInterface.Inspector {
}
func (c *testContext) WithStatusUpdateErr(ctx context.Context, action resources.DeploymentStatusUpdateErrFunc, force ...bool) error {
panic("implement me")
_, err := action(&c.ArangoDeployment.Status)
return err
}
func (c *testContext) GetKubeCli() kubernetes.Interface {
@ -180,7 +203,8 @@ func (c *testContext) SetAgencyMaintenanceMode(ctx context.Context, enabled bool
}
func (c *testContext) WithStatusUpdate(ctx context.Context, action resources.DeploymentStatusUpdateFunc, force ...bool) error {
panic("implement me")
action(&c.ArangoDeployment.Status)
return nil
}
func (c *testContext) GetPod(_ context.Context, podName string) (*core.Pod, error) {
@ -412,7 +436,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
status.Hashes.TLS.Propagated = true
status.Hashes.Encryption.Propagated = true
newPlan, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 1)
@ -423,12 +447,12 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
spec.Single.Count = util.NewInt(2)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -444,7 +468,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something1",
},
}
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale down
}
@ -473,7 +497,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
var status api.DeploymentStatus
addAgentsToStatus(t, &status, 3)
newPlan, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 2)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -486,7 +510,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 1)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -511,7 +535,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
PodName: "something4",
},
}
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 3) // Note: Downscaling is only down 1 at a time
assert.Equal(t, api.ActionTypeKillMemberPod, newPlan[0].Type)
@ -545,7 +569,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
var status api.DeploymentStatus
addAgentsToStatus(t, &status, 3)
newPlan, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed := createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 6) // Adding 3 dbservers & 3 coordinators (note: agents do not scale now)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -578,7 +602,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
PodName: "coordinator1",
},
}
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 3)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -615,7 +639,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
}
spec.DBServers.Count = util.NewInt(1)
spec.Coordinators.Count = util.NewInt(1)
newPlan, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
newPlan, _, changed = createNormalPlan(ctx, log, depl, nil, spec, status, inspector.NewEmptyInspector(), c)
assert.True(t, changed)
require.Len(t, newPlan, 7) // Note: Downscaling is done 1 at a time
assert.Equal(t, api.ActionTypeCleanOutMember, newPlan[0].Type)
@ -635,10 +659,12 @@ func TestCreatePlanClusterScale(t *testing.T) {
}
type LastLogRecord struct {
t *testing.T
msg string
}
func (l *LastLogRecord) Run(e *zerolog.Event, level zerolog.Level, msg string) {
l.t.Log(msg)
l.msg = msg
}
@ -1035,7 +1061,7 @@ func TestCreatePlan(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
h := &LastLogRecord{}
h := &LastLogRecord{t: t}
logger := zerolog.New(ioutil.Discard).Hook(h)
r := NewReconciler(logger, testCase.context)

View file

@ -23,7 +23,11 @@
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
@ -46,3 +50,10 @@ func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus,
}
return plan
}
func emptyPlanBuilder(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
return nil
}

View file

@ -85,6 +85,13 @@ type DeploymentPodRenderer interface {
RenderPodForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.Pod, error)
// RenderPodTemplateForMemberFromCurrent Renders PodTemplate definition for member
RenderPodTemplateForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.PodTemplateSpec, error)
DeploymentEndpoints
}
type DeploymentEndpoints interface {
// GenerateMemberEndpoint generates endpoint for a member
GenerateMemberEndpoint(group api.ServerGroup, member api.MemberStatus) (string, error)
}
type DeploymentImageManager interface {
@ -140,8 +147,20 @@ type ArangoAgency interface {
RefreshAgencyCache(ctx context.Context) (uint64, error)
}
type DeploymentInfoGetter interface {
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment
GetStatus() (api.DeploymentStatus, int32)
// GetStatus returns the current status of the deployment without revision
GetStatusSnapshot() api.DeploymentStatus
}
type ArangoApplier interface {
ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...patch.Item) error
ApplyPatch(ctx context.Context, p ...patch.Item) error
}
// Context provides all functions needed by the Resources service
@ -155,15 +174,10 @@ type Context interface {
DeploymentCachedStatus
ArangoAgency
ArangoApplier
DeploymentInfoGetter
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
// GetServerGroupIterator returns the deployment as ServerGroupIterator.
GetServerGroupIterator() ServerGroupIterator
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment
GetStatus() (api.DeploymentStatus, int32)
// UpdateStatus replaces the status of the deployment with the given status and
// updates the resources in k8s.
UpdateStatus(ctx context.Context, status api.DeploymentStatus, lastVersion int32, force ...bool) error