1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Adding update plan

This commit is contained in:
Ewout Prangsma 2018-02-22 16:54:36 +01:00
parent d19950cc84
commit a24ce05f47
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
9 changed files with 426 additions and 3 deletions

View file

@ -138,6 +138,9 @@ $(BIN): $(GOBUILDDIR) $(SOURCES)
docker: $(BIN) docker: $(BIN)
docker build -f $(DOCKERFILE) -t $(OPERATORIMAGE) . docker build -f $(DOCKERFILE) -t $(OPERATORIMAGE) .
ifdef PUSHIMAGES
docker push $(OPERATORIMAGE)
endif
# Testing # Testing
@ -208,8 +211,8 @@ minikube-start:
minikube start --cpus=4 --memory=6144 minikube start --cpus=4 --memory=6144
delete-operator: delete-operator:
kubectl delete -f examples/deployment.yaml --ignore-not-found kubectl delete deployment arangodb-operator --ignore-not-found
redeploy-operator: delete-operator redeploy-operator: delete-operator
kubectl create -f examples/deployment.yaml $(ROOTDIR)/scripts/kube_create_operator.sh default $(OPERATORIMAGE)
kubectl get pods kubectl get pods

View file

@ -0,0 +1,6 @@
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

View file

@ -4,3 +4,7 @@ metadata:
name: "example-simple-cluster" name: "example-simple-cluster"
spec: spec:
mode: cluster mode: cluster
agents:
storageClassName: local-storage
dbservers:
storageClassName: local-storage

View file

@ -24,6 +24,7 @@ package v1alpha
import ( import (
"fmt" "fmt"
"math/rand"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -71,6 +72,9 @@ type DeploymentStatus struct {
// Conditions specific to the entire deployment // Conditions specific to the entire deployment
Conditions ConditionList `json:"conditions,omitempty"` Conditions ConditionList `json:"conditions,omitempty"`
// Plan to update this deployment
Plan Plan `json:"plan,omitempty"`
} }
// DeploymentStatusMembers holds the member status of all server groups // DeploymentStatusMembers holds the member status of all server groups
@ -167,6 +171,32 @@ func (ds *DeploymentStatusMembers) UpdateMemberStatus(status MemberStatus, group
return nil return nil
} }
// RemoveByID a member with given ID from the given group.
// Returns a NotFoundError if the ID of the given member or group cannot be found.
func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) error {
var err error
switch group {
case ServerGroupSingle:
err = ds.Single.RemoveByID(id)
case ServerGroupAgents:
err = ds.Agents.RemoveByID(id)
case ServerGroupDBServers:
err = ds.DBServers.RemoveByID(id)
case ServerGroupCoordinators:
err = ds.Coordinators.RemoveByID(id)
case ServerGroupSyncMasters:
err = ds.SyncMasters.RemoveByID(id)
case ServerGroupSyncWorkers:
err = ds.SyncWorkers.RemoveByID(id)
default:
return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group))
}
if err != nil {
return maskAny(err)
}
return nil
}
// AllMembersReady returns true when all members are in the Ready state. // AllMembersReady returns true when all members are in the Ready state.
func (ds DeploymentStatusMembers) AllMembersReady() bool { func (ds DeploymentStatusMembers) AllMembersReady() bool {
if err := ds.ForeachServerGroup(func(group ServerGroup, list *MemberStatusList) error { if err := ds.ForeachServerGroup(func(group ServerGroup, list *MemberStatusList) error {
@ -244,6 +274,29 @@ func (l *MemberStatusList) RemoveByID(id string) error {
return maskAny(errors.Wrapf(NotFoundError, "Member '%s' is not a member", id)) return maskAny(errors.Wrapf(NotFoundError, "Member '%s' is not a member", id))
} }
// SelectMemberToRemove selects a member from the given list that should
// be removed in a scale down action.
// Returns an error if the list is empty.
func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) {
if len(l) > 0 {
// Try to find a not ready member
for _, m := range l {
if m.State == MemberStateNone {
return m, nil
}
}
// Pick a random member that is in created state
perm := rand.Perm(len(l))
for _, idx := range perm {
m := l[idx]
if m.State == MemberStateCreated {
return m, nil
}
}
}
return MemberStatus{}, maskAny(errors.Wrap(NotFoundError, "No member available for removal"))
}
// MemberState is a strongly typed state of a deployment member // MemberState is a strongly typed state of a deployment member
type MemberState string type MemberState string

View file

@ -0,0 +1,56 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package v1alpha
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// ActionType is a strongly typed name for a plan action item
type ActionType string
const (
// ActionTypeAddMember causes a member to be added.
ActionTypeAddMember ActionType = "AddMember"
// ActionTypeRemoveMember causes a member to be removed.
ActionTypeRemoveMember ActionType = "RemoveMember"
// ActionTypeDrainMember causes a member to be drained (dbserver only).
ActionTypeDrainMember ActionType = "DrainMember"
// ActionTypeShutdownMember causes a member to be shutdown.
ActionTypeShutdownMember ActionType = "ShutdownMember"
)
// Action represents a single action to be taken to update a deployment.
type Action struct {
// Type of action.
Type ActionType `json:"type"`
// ID reference of the member involved in this action (if any)
MemberID string `json:"memberID,omitempty"`
// Group involved in this action
Group ServerGroup `json:"group,omitempty"`
// StartTime is set the when the action has been started, but needs to wait to be finished.
StartTime *metav1.Time `json:"startTime,omitempty"`
}
// Plan is a list of actions that will be taken to update a deployment.
// Only 1 action is in progress at a time. The operator will wait for that
// action to be completely and then remove the action.
type Plan []Action

View file

@ -25,9 +25,35 @@
package v1alpha package v1alpha
import ( import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime" runtime "k8s.io/apimachinery/pkg/runtime"
) )
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Action) DeepCopyInto(out *Action) {
*out = *in
if in.StartTime != nil {
in, out := &in.StartTime, &out.StartTime
if *in == nil {
*out = nil
} else {
*out = new(v1.Time)
(*in).DeepCopyInto(*out)
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Action.
func (in *Action) DeepCopy() *Action {
if in == nil {
return nil
}
out := new(Action)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoDeployment) DeepCopyInto(out *ArangoDeployment) { func (in *ArangoDeployment) DeepCopyInto(out *ArangoDeployment) {
*out = *in *out = *in
@ -160,6 +186,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) {
(*in)[i].DeepCopyInto(&(*out)[i]) (*in)[i].DeepCopyInto(&(*out)[i])
} }
} }
if in.Plan != nil {
in, out := &in.Plan, &out.Plan
*out = make(Plan, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return return
} }

View file

@ -72,7 +72,8 @@ type deploymentEvent struct {
const ( const (
deploymentEventQueueSize = 100 deploymentEventQueueSize = 100
inspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval minInspectionInterval = time.Second // Ensure we inspect the generated resources no less than with this interval
maxInspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval
) )
// Deployment is the in process state of an ArangoDeployment. // Deployment is the in process state of an ArangoDeployment.
@ -178,6 +179,8 @@ func (d *Deployment) run() {
} }
log.Info().Msg("start running...") log.Info().Msg("start running...")
inspectionInterval := maxInspectionInterval
recentInspectionErrors := 0
for { for {
select { select {
case <-d.stopCh: case <-d.stopCh:
@ -200,18 +203,44 @@ func (d *Deployment) run() {
} }
case <-d.inspectTrigger.Done(): case <-d.inspectTrigger.Done():
hasError := false
// Inspection of generated resources needed // Inspection of generated resources needed
if err := d.inspectPods(); err != nil { if err := d.inspectPods(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject)) d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
} }
// Create scale/update plan
if err := d.createPlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
}
// Execute current step of scale/update plan
if err := d.executePlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
}
// Ensure all resources are created // Ensure all resources are created
if err := d.ensurePods(d.apiObject); err != nil { if err := d.ensurePods(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject)) d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
} }
if hasError {
if recentInspectionErrors == 0 {
inspectionInterval = minInspectionInterval
recentInspectionErrors++
}
} else {
recentInspectionErrors = 0
}
case <-time.After(inspectionInterval): case <-time.After(inspectionInterval):
// Trigger inspection // Trigger inspection
d.inspectTrigger.Trigger() d.inspectTrigger.Trigger()
// Backoff with next interval
inspectionInterval = time.Duration(float64(inspectionInterval) * 1.5)
if inspectionInterval > maxInspectionInterval {
inspectionInterval = maxInspectionInterval
}
} }
} }
} }

View file

@ -0,0 +1,92 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
)
// createPlan considers the current specification & status of the deployment creates a plan to
// get the status in line with the specification.
// If a plan already exists, nothing is done.
func (d *Deployment) createPlan() error {
if len(d.status.Plan) > 0 {
// Plan already exists, complete that first
return nil
}
// Check for various scenario's
spec := d.apiObject.Spec
var plan api.Plan
// Check for scale up/down
switch spec.Mode {
case api.DeploymentModeSingle:
// Never scale down
case api.DeploymentModeResilientSingle:
// Only scale singles
plan = append(plan, createScalePlan(d.status.Members.Single, api.ServerGroupSingle, spec.Single.Count)...)
case api.DeploymentModeCluster:
// Scale dbservers
plan = append(plan, createScalePlan(d.status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.Count)...)
plan = append(plan, createScalePlan(d.status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.Count)...)
plan = append(plan, createScalePlan(d.status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.Count)...)
plan = append(plan, createScalePlan(d.status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.Count)...)
}
// Save plan
if len(plan) == 0 {
// Nothing to do
return nil
}
d.status.Plan = plan
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
return nil
}
// createScalePlan creates a scaling plan for a single server group
func createScalePlan(members api.MemberStatusList, group api.ServerGroup, count int) api.Plan {
var plan api.Plan
if len(members) < count {
// Scale up
for i := 0; i < count-len(members); i++ {
plan = append(plan, api.Action{Type: api.ActionTypeAddMember, Group: group})
}
} else if len(members) > count {
// Note, we scale down 1 member as a time
if m, err := members.SelectMemberToRemove(); err == nil {
if group == api.ServerGroupDBServers {
plan = append(plan,
api.Action{Type: api.ActionTypeDrainMember, Group: group, MemberID: m.ID},
)
}
plan = append(plan,
api.Action{Type: api.ActionTypeShutdownMember, Group: group, MemberID: m.ID},
api.Action{Type: api.ActionTypeRemoveMember, Group: group, MemberID: m.ID},
)
}
}
return plan
}

View file

@ -0,0 +1,147 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
)
// executePlan tries to execute the plan as far as possible.
func (d *Deployment) executePlan() error {
log := d.deps.Log
for {
if len(d.status.Plan) == 0 {
// No plan exists, nothing to be done
return nil
}
// Take first action
action := d.status.Plan[0]
if action.StartTime.IsZero() {
// Not started yey
ready, err := d.startAction(action)
if err != nil {
log.Debug().Err(err).
Str("action-type", string(action.Type)).
Msg("Failed to start action")
return maskAny(err)
}
if ready {
// Remove action from list
d.status.Plan = d.status.Plan[1:]
} else {
// Mark start time
now := metav1.Now()
d.status.Plan[0].StartTime = &now
}
// Save plan update
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
} else {
// First action of plan has been started, check its progress
ready, err := d.checkActionProgress(action)
if err != nil {
log.Debug().Err(err).
Str("action-type", string(action.Type)).
Msg("Failed to check action progress")
return maskAny(err)
}
if ready {
// Remove action from list
d.status.Plan = d.status.Plan[1:]
// Save plan update
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
}
}
}
}
// startAction performs the start of the given action
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (d *Deployment) startAction(action api.Action) (bool, error) {
log := d.deps.Log
switch action.Type {
case api.ActionTypeAddMember:
if err := d.createMember(action.Group, d.apiObject); err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member")
return false, maskAny(err)
}
// Save added member
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
return true, nil
case api.ActionTypeRemoveMember:
if err := d.status.Members.RemoveByID(action.MemberID, action.Group); api.IsNotFound(err) {
// We wanted to remove and it is already gone. All ok
return true, nil
} else if err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to remove member")
return false, maskAny(err)
}
// Save removed member
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
return true, nil
case api.ActionTypeDrainMember:
// TODO
return true, nil
case api.ActionTypeShutdownMember:
// TODO
return true, nil
default:
return false, maskAny(fmt.Errorf("Unknown action type"))
}
}
// checkActionProgress checks the progress of the given action.
// Returns true if the action is completely finished, false otherwise.
func (d *Deployment) checkActionProgress(action api.Action) (bool, error) {
switch action.Type {
case api.ActionTypeAddMember:
// Nothing todo
return true, nil
case api.ActionTypeRemoveMember:
// Nothing todo
return true, nil
case api.ActionTypeDrainMember:
// TODO
return true, nil
case api.ActionTypeShutdownMember:
// TODO
return true, nil
default:
return false, maskAny(fmt.Errorf("Unknown action type"))
}
}