diff --git a/Makefile b/Makefile index 9c9640c4d..ec535fab8 100644 --- a/Makefile +++ b/Makefile @@ -138,6 +138,9 @@ $(BIN): $(GOBUILDDIR) $(SOURCES) docker: $(BIN) docker build -f $(DOCKERFILE) -t $(OPERATORIMAGE) . +ifdef PUSHIMAGES + docker push $(OPERATORIMAGE) +endif # Testing @@ -208,8 +211,8 @@ minikube-start: minikube start --cpus=4 --memory=6144 delete-operator: - kubectl delete -f examples/deployment.yaml --ignore-not-found + kubectl delete deployment arangodb-operator --ignore-not-found redeploy-operator: delete-operator - kubectl create -f examples/deployment.yaml + $(ROOTDIR)/scripts/kube_create_operator.sh default $(OPERATORIMAGE) kubectl get pods diff --git a/examples/local-storage-class.yaml b/examples/local-storage-class.yaml new file mode 100644 index 000000000..fcfb1a0f3 --- /dev/null +++ b/examples/local-storage-class.yaml @@ -0,0 +1,6 @@ +kind: StorageClass +apiVersion: storage.k8s.io/v1 +metadata: + name: local-storage +provisioner: kubernetes.io/no-provisioner +volumeBindingMode: WaitForFirstConsumer diff --git a/examples/simple-cluster.yaml b/examples/simple-cluster.yaml index 88e17703e..d927eb179 100644 --- a/examples/simple-cluster.yaml +++ b/examples/simple-cluster.yaml @@ -4,3 +4,7 @@ metadata: name: "example-simple-cluster" spec: mode: cluster + agents: + storageClassName: local-storage + dbservers: + storageClassName: local-storage diff --git a/pkg/apis/arangodb/v1alpha/deployment_status.go b/pkg/apis/arangodb/v1alpha/deployment_status.go index 29092108c..0cb32c48c 100644 --- a/pkg/apis/arangodb/v1alpha/deployment_status.go +++ b/pkg/apis/arangodb/v1alpha/deployment_status.go @@ -24,6 +24,7 @@ package v1alpha import ( "fmt" + "math/rand" "github.com/pkg/errors" ) @@ -71,6 +72,9 @@ type DeploymentStatus struct { // Conditions specific to the entire deployment Conditions ConditionList `json:"conditions,omitempty"` + + // Plan to update this deployment + Plan Plan `json:"plan,omitempty"` } // DeploymentStatusMembers holds the member status of all server groups @@ -167,6 +171,32 @@ func (ds *DeploymentStatusMembers) UpdateMemberStatus(status MemberStatus, group return nil } +// RemoveByID a member with given ID from the given group. +// Returns a NotFoundError if the ID of the given member or group cannot be found. +func (ds *DeploymentStatusMembers) RemoveByID(id string, group ServerGroup) error { + var err error + switch group { + case ServerGroupSingle: + err = ds.Single.RemoveByID(id) + case ServerGroupAgents: + err = ds.Agents.RemoveByID(id) + case ServerGroupDBServers: + err = ds.DBServers.RemoveByID(id) + case ServerGroupCoordinators: + err = ds.Coordinators.RemoveByID(id) + case ServerGroupSyncMasters: + err = ds.SyncMasters.RemoveByID(id) + case ServerGroupSyncWorkers: + err = ds.SyncWorkers.RemoveByID(id) + default: + return maskAny(errors.Wrapf(NotFoundError, "ServerGroup %d is not known", group)) + } + if err != nil { + return maskAny(err) + } + return nil +} + // AllMembersReady returns true when all members are in the Ready state. func (ds DeploymentStatusMembers) AllMembersReady() bool { if err := ds.ForeachServerGroup(func(group ServerGroup, list *MemberStatusList) error { @@ -244,6 +274,29 @@ func (l *MemberStatusList) RemoveByID(id string) error { return maskAny(errors.Wrapf(NotFoundError, "Member '%s' is not a member", id)) } +// SelectMemberToRemove selects a member from the given list that should +// be removed in a scale down action. +// Returns an error if the list is empty. +func (l MemberStatusList) SelectMemberToRemove() (MemberStatus, error) { + if len(l) > 0 { + // Try to find a not ready member + for _, m := range l { + if m.State == MemberStateNone { + return m, nil + } + } + // Pick a random member that is in created state + perm := rand.Perm(len(l)) + for _, idx := range perm { + m := l[idx] + if m.State == MemberStateCreated { + return m, nil + } + } + } + return MemberStatus{}, maskAny(errors.Wrap(NotFoundError, "No member available for removal")) +} + // MemberState is a strongly typed state of a deployment member type MemberState string diff --git a/pkg/apis/arangodb/v1alpha/plan.go b/pkg/apis/arangodb/v1alpha/plan.go new file mode 100644 index 000000000..10224bbcb --- /dev/null +++ b/pkg/apis/arangodb/v1alpha/plan.go @@ -0,0 +1,56 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package v1alpha + +import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + +// ActionType is a strongly typed name for a plan action item +type ActionType string + +const ( + // ActionTypeAddMember causes a member to be added. + ActionTypeAddMember ActionType = "AddMember" + // ActionTypeRemoveMember causes a member to be removed. + ActionTypeRemoveMember ActionType = "RemoveMember" + // ActionTypeDrainMember causes a member to be drained (dbserver only). + ActionTypeDrainMember ActionType = "DrainMember" + // ActionTypeShutdownMember causes a member to be shutdown. + ActionTypeShutdownMember ActionType = "ShutdownMember" +) + +// Action represents a single action to be taken to update a deployment. +type Action struct { + // Type of action. + Type ActionType `json:"type"` + // ID reference of the member involved in this action (if any) + MemberID string `json:"memberID,omitempty"` + // Group involved in this action + Group ServerGroup `json:"group,omitempty"` + // StartTime is set the when the action has been started, but needs to wait to be finished. + StartTime *metav1.Time `json:"startTime,omitempty"` +} + +// Plan is a list of actions that will be taken to update a deployment. +// Only 1 action is in progress at a time. The operator will wait for that +// action to be completely and then remove the action. +type Plan []Action diff --git a/pkg/apis/arangodb/v1alpha/zz_generated.deepcopy.go b/pkg/apis/arangodb/v1alpha/zz_generated.deepcopy.go index 7064a52a1..1712d358c 100644 --- a/pkg/apis/arangodb/v1alpha/zz_generated.deepcopy.go +++ b/pkg/apis/arangodb/v1alpha/zz_generated.deepcopy.go @@ -25,9 +25,35 @@ package v1alpha import ( + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Action) DeepCopyInto(out *Action) { + *out = *in + if in.StartTime != nil { + in, out := &in.StartTime, &out.StartTime + if *in == nil { + *out = nil + } else { + *out = new(v1.Time) + (*in).DeepCopyInto(*out) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Action. +func (in *Action) DeepCopy() *Action { + if in == nil { + return nil + } + out := new(Action) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ArangoDeployment) DeepCopyInto(out *ArangoDeployment) { *out = *in @@ -160,6 +186,13 @@ func (in *DeploymentStatus) DeepCopyInto(out *DeploymentStatus) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Plan != nil { + in, out := &in.Plan, &out.Plan + *out = make(Plan, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index c10ca6cef..38d4f5f56 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -72,7 +72,8 @@ type deploymentEvent struct { const ( deploymentEventQueueSize = 100 - inspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval + minInspectionInterval = time.Second // Ensure we inspect the generated resources no less than with this interval + maxInspectionInterval = time.Minute // Ensure we inspect the generated resources no less than with this interval ) // Deployment is the in process state of an ArangoDeployment. @@ -178,6 +179,8 @@ func (d *Deployment) run() { } log.Info().Msg("start running...") + inspectionInterval := maxInspectionInterval + recentInspectionErrors := 0 for { select { case <-d.stopCh: @@ -200,18 +203,44 @@ func (d *Deployment) run() { } case <-d.inspectTrigger.Done(): + hasError := false // Inspection of generated resources needed if err := d.inspectPods(); err != nil { + hasError = true d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject)) } + // Create scale/update plan + if err := d.createPlan(); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject)) + } + // Execute current step of scale/update plan + if err := d.executePlan(); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject)) + } // Ensure all resources are created if err := d.ensurePods(d.apiObject); err != nil { + hasError = true d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject)) } + if hasError { + if recentInspectionErrors == 0 { + inspectionInterval = minInspectionInterval + recentInspectionErrors++ + } + } else { + recentInspectionErrors = 0 + } case <-time.After(inspectionInterval): // Trigger inspection d.inspectTrigger.Trigger() + // Backoff with next interval + inspectionInterval = time.Duration(float64(inspectionInterval) * 1.5) + if inspectionInterval > maxInspectionInterval { + inspectionInterval = maxInspectionInterval + } } } } diff --git a/pkg/deployment/plan_builder.go b/pkg/deployment/plan_builder.go new file mode 100644 index 000000000..7c8ceefff --- /dev/null +++ b/pkg/deployment/plan_builder.go @@ -0,0 +1,92 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha" +) + +// createPlan considers the current specification & status of the deployment creates a plan to +// get the status in line with the specification. +// If a plan already exists, nothing is done. +func (d *Deployment) createPlan() error { + if len(d.status.Plan) > 0 { + // Plan already exists, complete that first + return nil + } + + // Check for various scenario's + spec := d.apiObject.Spec + var plan api.Plan + + // Check for scale up/down + switch spec.Mode { + case api.DeploymentModeSingle: + // Never scale down + case api.DeploymentModeResilientSingle: + // Only scale singles + plan = append(plan, createScalePlan(d.status.Members.Single, api.ServerGroupSingle, spec.Single.Count)...) + case api.DeploymentModeCluster: + // Scale dbservers + plan = append(plan, createScalePlan(d.status.Members.DBServers, api.ServerGroupDBServers, spec.DBServers.Count)...) + plan = append(plan, createScalePlan(d.status.Members.Coordinators, api.ServerGroupCoordinators, spec.Coordinators.Count)...) + plan = append(plan, createScalePlan(d.status.Members.SyncMasters, api.ServerGroupSyncMasters, spec.SyncMasters.Count)...) + plan = append(plan, createScalePlan(d.status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.Count)...) + } + + // Save plan + if len(plan) == 0 { + // Nothing to do + return nil + } + d.status.Plan = plan + if err := d.updateCRStatus(); err != nil { + return maskAny(err) + } + return nil +} + +// createScalePlan creates a scaling plan for a single server group +func createScalePlan(members api.MemberStatusList, group api.ServerGroup, count int) api.Plan { + var plan api.Plan + if len(members) < count { + // Scale up + for i := 0; i < count-len(members); i++ { + plan = append(plan, api.Action{Type: api.ActionTypeAddMember, Group: group}) + } + } else if len(members) > count { + // Note, we scale down 1 member as a time + if m, err := members.SelectMemberToRemove(); err == nil { + if group == api.ServerGroupDBServers { + plan = append(plan, + api.Action{Type: api.ActionTypeDrainMember, Group: group, MemberID: m.ID}, + ) + } + plan = append(plan, + api.Action{Type: api.ActionTypeShutdownMember, Group: group, MemberID: m.ID}, + api.Action{Type: api.ActionTypeRemoveMember, Group: group, MemberID: m.ID}, + ) + } + } + return plan +} diff --git a/pkg/deployment/plan_executor.go b/pkg/deployment/plan_executor.go new file mode 100644 index 000000000..32c216f98 --- /dev/null +++ b/pkg/deployment/plan_executor.go @@ -0,0 +1,147 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha" +) + +// executePlan tries to execute the plan as far as possible. +func (d *Deployment) executePlan() error { + log := d.deps.Log + + for { + if len(d.status.Plan) == 0 { + // No plan exists, nothing to be done + return nil + } + + // Take first action + action := d.status.Plan[0] + if action.StartTime.IsZero() { + // Not started yey + ready, err := d.startAction(action) + if err != nil { + log.Debug().Err(err). + Str("action-type", string(action.Type)). + Msg("Failed to start action") + return maskAny(err) + } + if ready { + // Remove action from list + d.status.Plan = d.status.Plan[1:] + } else { + // Mark start time + now := metav1.Now() + d.status.Plan[0].StartTime = &now + } + // Save plan update + if err := d.updateCRStatus(); err != nil { + return maskAny(err) + } + } else { + // First action of plan has been started, check its progress + ready, err := d.checkActionProgress(action) + if err != nil { + log.Debug().Err(err). + Str("action-type", string(action.Type)). + Msg("Failed to check action progress") + return maskAny(err) + } + if ready { + // Remove action from list + d.status.Plan = d.status.Plan[1:] + // Save plan update + if err := d.updateCRStatus(); err != nil { + return maskAny(err) + } + } + } + } +} + +// startAction performs the start of the given action +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (d *Deployment) startAction(action api.Action) (bool, error) { + log := d.deps.Log + + switch action.Type { + case api.ActionTypeAddMember: + if err := d.createMember(action.Group, d.apiObject); err != nil { + log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member") + return false, maskAny(err) + } + // Save added member + if err := d.updateCRStatus(); err != nil { + return false, maskAny(err) + } + return true, nil + case api.ActionTypeRemoveMember: + if err := d.status.Members.RemoveByID(action.MemberID, action.Group); api.IsNotFound(err) { + // We wanted to remove and it is already gone. All ok + return true, nil + } else if err != nil { + log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to remove member") + return false, maskAny(err) + } + // Save removed member + if err := d.updateCRStatus(); err != nil { + return false, maskAny(err) + } + return true, nil + case api.ActionTypeDrainMember: + // TODO + return true, nil + case api.ActionTypeShutdownMember: + // TODO + return true, nil + default: + return false, maskAny(fmt.Errorf("Unknown action type")) + } +} + +// checkActionProgress checks the progress of the given action. +// Returns true if the action is completely finished, false otherwise. +func (d *Deployment) checkActionProgress(action api.Action) (bool, error) { + switch action.Type { + case api.ActionTypeAddMember: + // Nothing todo + return true, nil + case api.ActionTypeRemoveMember: + // Nothing todo + return true, nil + case api.ActionTypeDrainMember: + // TODO + return true, nil + case api.ActionTypeShutdownMember: + // TODO + return true, nil + default: + return false, maskAny(fmt.Errorf("Unknown action type")) + } +}