diff --git a/deps/github.com/arangodb/go-driver/http/connection.go b/deps/github.com/arangodb/go-driver/http/connection.go index bb6f70a47..92ee9f4b1 100644 --- a/deps/github.com/arangodb/go-driver/http/connection.go +++ b/deps/github.com/arangodb/go-driver/http/connection.go @@ -67,6 +67,9 @@ type ConnectionConfig struct { // directly after use, resulting in a large number of connections in `TIME_WAIT` state. // When this value is not set, the driver will set it to 64 automatically. Transport http.RoundTripper + // DontFollowRedirect; if set, redirect will not be followed, response from the initial request will be returned without an error + // DontFollowRedirect takes precendance over FailOnRedirect. + DontFollowRedirect bool // FailOnRedirect; if set, redirect will not be followed, instead the status code is returned as error FailOnRedirect bool // Cluster configuration settings @@ -137,7 +140,11 @@ func newHTTPConnection(endpoint string, config ConnectionConfig) (driver.Connect httpClient := &http.Client{ Transport: config.Transport, } - if config.FailOnRedirect { + if config.DontFollowRedirect { + httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse // Do not wrap, standard library will not understand + } + } else if config.FailOnRedirect { httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error { return driver.ArangoError{ HasError: true, diff --git a/examples/nodeport-cluster.yaml b/examples/nodeport-cluster.yaml index f5765e61e..5de0b4c45 100644 --- a/examples/nodeport-cluster.yaml +++ b/examples/nodeport-cluster.yaml @@ -7,6 +7,7 @@ spec: app: arangodb role: coordinator type: NodePort + publishNotReadyAddresses: true ports: - protocol: TCP port: 8529 diff --git a/examples/simple-cluster.yaml b/examples/simple-cluster.yaml index 88e17703e..52fb63621 100644 --- a/examples/simple-cluster.yaml +++ b/examples/simple-cluster.yaml @@ -4,3 +4,9 @@ metadata: name: "example-simple-cluster" spec: mode: cluster + image: arangodb/arangodb:3.3.4 + tls: + altNames: ["kube-01", "kube-02", "kube-03"] + coordinators: + args: + - --log.level=true diff --git a/main.go b/main.go index 5f1e9019a..a066187a3 100644 --- a/main.go +++ b/main.go @@ -186,7 +186,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper CreateCRD: operatorOptions.createCRD, } deps := operator.Dependencies{ - Log: logService.MustGetLogger("operator"), + LogService: logService, KubeCli: kubecli, KubeExtCli: kubeExtCli, CRCli: crCli, diff --git a/pkg/apis/deployment/v1alpha/conditions.go b/pkg/apis/deployment/v1alpha/conditions.go index 8930de65e..9ecf645e3 100644 --- a/pkg/apis/deployment/v1alpha/conditions.go +++ b/pkg/apis/deployment/v1alpha/conditions.go @@ -35,6 +35,8 @@ const ( ConditionTypeReady ConditionType = "Ready" // ConditionTypeTerminated indicates that the member has terminated and will not restart. ConditionTypeTerminated ConditionType = "Terminated" + // ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once. + ConditionTypeAutoUpgrade ConditionType = "AutoUpgrade" ) // Condition represents one current condition of a deployment or deployment member. diff --git a/pkg/apis/deployment/v1alpha/deployment_spec.go b/pkg/apis/deployment/v1alpha/deployment_spec.go index d9bbf5fca..59c4f9e5c 100644 --- a/pkg/apis/deployment/v1alpha/deployment_spec.go +++ b/pkg/apis/deployment/v1alpha/deployment_spec.go @@ -99,6 +99,27 @@ func (s DeploymentSpec) IsSecure() bool { return s.TLS.IsSecure() } +// GetServerGroupSpec returns the server group spec (from this +// deployment spec) for the given group. +func (s DeploymentSpec) GetServerGroupSpec(group ServerGroup) ServerGroupSpec { + switch group { + case ServerGroupSingle: + return s.Single + case ServerGroupAgents: + return s.Agents + case ServerGroupDBServers: + return s.DBServers + case ServerGroupCoordinators: + return s.Coordinators + case ServerGroupSyncMasters: + return s.SyncMasters + case ServerGroupSyncWorkers: + return s.SyncWorkers + default: + return ServerGroupSpec{} + } +} + // SetDefaults fills in default values when a field is not specified. func (s *DeploymentSpec) SetDefaults(deploymentName string) { if s.GetMode() == "" { diff --git a/pkg/apis/deployment/v1alpha/member_state.go b/pkg/apis/deployment/v1alpha/member_state.go index 65062e224..4fe637268 100644 --- a/pkg/apis/deployment/v1alpha/member_state.go +++ b/pkg/apis/deployment/v1alpha/member_state.go @@ -34,4 +34,8 @@ const ( MemberStateCleanOut MemberState = "CleanOut" // MemberStateShuttingDown indicates that a member is shutting down MemberStateShuttingDown MemberState = "ShuttingDown" + // MemberStateRotating indicates that a member is being rotated + MemberStateRotating MemberState = "Rotating" + // MemberStateUpgrading indicates that a member is in the process of upgrading its database data format + MemberStateUpgrading MemberState = "Upgrading" ) diff --git a/pkg/apis/deployment/v1alpha/plan.go b/pkg/apis/deployment/v1alpha/plan.go index 9609fcfe1..27793d45d 100644 --- a/pkg/apis/deployment/v1alpha/plan.go +++ b/pkg/apis/deployment/v1alpha/plan.go @@ -22,7 +22,10 @@ package v1alpha -import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +import ( + "github.com/dchest/uniuri" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) // ActionType is a strongly typed name for a plan action item type ActionType string @@ -36,18 +39,45 @@ const ( ActionTypeCleanOutMember ActionType = "CleanOutMember" // ActionTypeShutdownMember causes a member to be shutdown and removed from the cluster. ActionTypeShutdownMember ActionType = "ShutdownMember" + // ActionTypeRotateMember causes a member to be shutdown and have it's pod removed. + ActionTypeRotateMember ActionType = "RotateMember" + // ActionTypeUpgradeMember causes a member to be shutdown and have it's pod removed, restarted with AutoUpgrade option, waited until termination and the restarted again. + ActionTypeUpgradeMember ActionType = "UpgradeMember" + // ActionTypeWaitForMemberUp causes the plan to wait until the member is considered "up". + ActionTypeWaitForMemberUp ActionType = "WaitForMemberUp" ) // Action represents a single action to be taken to update a deployment. type Action struct { + // ID of this action (unique for every action) + ID string `json:"id"` // Type of action. Type ActionType `json:"type"` // ID reference of the member involved in this action (if any) MemberID string `json:"memberID,omitempty"` // Group involved in this action Group ServerGroup `json:"group,omitempty"` + // CreationTime is set the when the action is created. + CreationTime metav1.Time `json:"creationTime"` // StartTime is set the when the action has been started, but needs to wait to be finished. StartTime *metav1.Time `json:"startTime,omitempty"` + // Reason for this action + Reason string `json:"reason,omitempty"` +} + +// NewAction instantiates a new Action. +func NewAction(actionType ActionType, group ServerGroup, memberID string, reason ...string) Action { + a := Action{ + ID: uniuri.New(), + Type: actionType, + MemberID: memberID, + Group: group, + CreationTime: metav1.Now(), + } + if len(reason) != 0 { + a.Reason = reason[0] + } + return a } // Plan is a list of actions that will be taken to update a deployment. diff --git a/pkg/apis/deployment/v1alpha/server_group.go b/pkg/apis/deployment/v1alpha/server_group.go index 2c8d6e63d..8d8725693 100644 --- a/pkg/apis/deployment/v1alpha/server_group.go +++ b/pkg/apis/deployment/v1alpha/server_group.go @@ -65,6 +65,26 @@ func (g ServerGroup) AsRole() string { } } +// AsRoleAbbreviated returns the abbreviation of the "role" value for the given group. +func (g ServerGroup) AsRoleAbbreviated() string { + switch g { + case ServerGroupSingle: + return "sngl" + case ServerGroupAgents: + return "agnt" + case ServerGroupDBServers: + return "prmr" + case ServerGroupCoordinators: + return "crdn" + case ServerGroupSyncMasters: + return "syma" + case ServerGroupSyncWorkers: + return "sywo" + default: + return "?" + } +} + // IsArangod returns true when the groups runs servers of type `arangod`. func (g ServerGroup) IsArangod() bool { switch g { diff --git a/pkg/apis/deployment/v1alpha/server_group_test.go b/pkg/apis/deployment/v1alpha/server_group_test.go index 835702c7d..f3464d6ce 100644 --- a/pkg/apis/deployment/v1alpha/server_group_test.go +++ b/pkg/apis/deployment/v1alpha/server_group_test.go @@ -37,6 +37,15 @@ func TestServerGroupAsRole(t *testing.T) { assert.Equal(t, "syncworker", ServerGroupSyncWorkers.AsRole()) } +func TestServerGroupAsRoleAbbreviated(t *testing.T) { + assert.Equal(t, "sngl", ServerGroupSingle.AsRoleAbbreviated()) + assert.Equal(t, "agnt", ServerGroupAgents.AsRoleAbbreviated()) + assert.Equal(t, "prmr", ServerGroupDBServers.AsRoleAbbreviated()) + assert.Equal(t, "crdn", ServerGroupCoordinators.AsRoleAbbreviated()) + assert.Equal(t, "syma", ServerGroupSyncMasters.AsRoleAbbreviated()) + assert.Equal(t, "sywo", ServerGroupSyncWorkers.AsRoleAbbreviated()) +} + func TestServerGroupIsArangod(t *testing.T) { assert.True(t, ServerGroupSingle.IsArangod()) assert.True(t, ServerGroupAgents.IsArangod()) diff --git a/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go b/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go index 697427f4c..328832e72 100644 --- a/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go +++ b/pkg/apis/deployment/v1alpha/zz_generated.deepcopy.go @@ -35,6 +35,7 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Action) DeepCopyInto(out *Action) { *out = *in + in.CreationTime.DeepCopyInto(&out.CreationTime) if in.StartTime != nil { in, out := &in.StartTime, &out.StartTime if *in == nil { diff --git a/pkg/deployment/action.go b/pkg/deployment/action.go new file mode 100644 index 000000000..ea064559b --- /dev/null +++ b/pkg/deployment/action.go @@ -0,0 +1,38 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" +) + +// Action executes a single Plan item. +type Action interface { + // Start performs the start of the action. + // Returns true if the action is completely finished, false in case + // the start time needs to be recorded and a ready condition needs to be checked. + Start(ctx context.Context) (bool, error) + // CheckProgress checks the progress of the action. + // Returns true if the action is completely finished, false otherwise. + CheckProgress(ctx context.Context) (bool, error) +} diff --git a/pkg/deployment/action_add_member.go b/pkg/deployment/action_add_member.go new file mode 100644 index 000000000..8e11e4893 --- /dev/null +++ b/pkg/deployment/action_add_member.go @@ -0,0 +1,66 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +// NewAddMemberAction creates a new Action that implements the given +// planned AddMember action. +func NewAddMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionAddMember{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// actionAddMember implements an AddMemberAction. +type actionAddMember struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionAddMember) Start(ctx context.Context) (bool, error) { + if err := a.actionCtx.CreateMember(a.action.Group); err != nil { + log.Debug().Err(err).Msg("Failed to create member") + return false, maskAny(err) + } + return true, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionAddMember) CheckProgress(ctx context.Context) (bool, error) { + // Nothing todo + return true, nil +} diff --git a/pkg/deployment/action_cleanout_member.go b/pkg/deployment/action_cleanout_member.go new file mode 100644 index 000000000..d386fcc88 --- /dev/null +++ b/pkg/deployment/action_cleanout_member.go @@ -0,0 +1,105 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/rs/zerolog" +) + +// NewCleanOutMemberAction creates a new Action that implements the given +// planned CleanOutMember action. +func NewCleanOutMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionCleanoutMember{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// actionCleanoutMember implements an CleanOutMemberAction. +type actionCleanoutMember struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) { + m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !ok { + // We wanted to remove and it is already gone. All ok + return true, nil + } + log := a.log + c, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create member client") + return false, maskAny(err) + } + cluster, err := c.Cluster(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to access cluster") + return false, maskAny(err) + } + if err := cluster.CleanOutServer(ctx, a.action.MemberID); err != nil { + log.Debug().Err(err).Msg("Failed to cleanout member") + return false, maskAny(err) + } + // Update status + m.State = api.MemberStateCleanOut + if a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + return false, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, error) { + log := a.log + c, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create member client") + return false, maskAny(err) + } + cluster, err := c.Cluster(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to access cluster") + return false, maskAny(err) + } + cleanedOut, err := cluster.IsCleanedOut(ctx, a.action.MemberID) + if err != nil { + return false, maskAny(err) + } + if !cleanedOut { + // We're not done yet + return false, nil + } + // Cleanout completed + return true, nil +} diff --git a/pkg/deployment/action_context.go b/pkg/deployment/action_context.go new file mode 100644 index 000000000..537250579 --- /dev/null +++ b/pkg/deployment/action_context.go @@ -0,0 +1,205 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + "fmt" + + driver "github.com/arangodb/go-driver" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" +) + +// ActionContext provides methods to the Action implementations +// to control their context. +type ActionContext interface { + // Gets the specified mode of deployment + GetMode() api.DeploymentMode + // GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server), + // creating one if needed. + GetDatabaseClient(ctx context.Context) (driver.Client, error) + // GetServerClient returns a cached client for a specific server. + GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) + // GetAgencyClients returns a client connection for every agency member. + GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) + // GetMemberStatusByID returns the current member status + // for the member with given id. + // Returns member status, true when found, or false + // when no such member is found. + GetMemberStatusByID(id string) (api.MemberStatus, bool) + // CreateMember adds a new member to the given group. + CreateMember(group api.ServerGroup) error + // UpdateMember updates the deployment status wrt the given member. + UpdateMember(member api.MemberStatus) error + // RemoveMemberByID removes a member with given id. + RemoveMemberByID(id string) error + // DeletePod deletes a pod with given name in the namespace + // of the deployment. If the pod does not exist, the error is ignored. + DeletePod(podName string) error + // DeletePvc deletes a persistent volume claim with given name in the namespace + // of the deployment. If the pvc does not exist, the error is ignored. + DeletePvc(pvcName string) error +} + +// NewActionContext creates a new ActionContext implementation. +func NewActionContext(log zerolog.Logger, deployment *Deployment) ActionContext { + return &actionContext{ + log: log, + deployment: deployment, + } +} + +// actionContext implements ActionContext +type actionContext struct { + log zerolog.Logger + deployment *Deployment +} + +// Gets the specified mode of deployment +func (ac *actionContext) GetMode() api.DeploymentMode { + return ac.deployment.apiObject.Spec.GetMode() +} + +// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server), +// creating one if needed. +func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) { + c, err := ac.deployment.clientCache.GetDatabase(ctx) + if err != nil { + return nil, maskAny(err) + } + return c, nil +} + +// GetServerClient returns a cached client for a specific server. +func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) { + c, err := ac.deployment.clientCache.Get(ctx, group, id) + if err != nil { + return nil, maskAny(err) + } + return c, nil +} + +// GetAgencyClients returns a client connection for every agency member. +func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) { + agencyMembers := ac.deployment.status.Members.Agents + result := make([]arangod.Agency, 0, len(agencyMembers)) + for _, m := range agencyMembers { + client, err := ac.GetServerClient(ctx, api.ServerGroupAgents, m.ID) + if err != nil { + return nil, maskAny(err) + } + aClient, err := arangod.NewAgencyClient(client) + if err != nil { + return nil, maskAny(err) + } + result = append(result, aClient) + } + return result, nil +} + +// GetMemberStatusByID returns the current member status +// for the member with given id. +// Returns member status, true when found, or false +// when no such member is found. +func (ac *actionContext) GetMemberStatusByID(id string) (api.MemberStatus, bool) { + m, _, ok := ac.deployment.status.Members.ElementByID(id) + return m, ok +} + +// CreateMember adds a new member to the given group. +func (ac *actionContext) CreateMember(group api.ServerGroup) error { + d := ac.deployment + if err := d.createMember(group, d.apiObject); err != nil { + ac.log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member") + return maskAny(err) + } + // Save added member + if err := d.updateCRStatus(); err != nil { + log.Debug().Err(err).Msg("Updating CR status failed") + return maskAny(err) + } + return nil +} + +// UpdateMember updates the deployment status wrt the given member. +func (ac *actionContext) UpdateMember(member api.MemberStatus) error { + d := ac.deployment + _, group, found := ac.deployment.status.Members.ElementByID(member.ID) + if !found { + return maskAny(fmt.Errorf("Member %s not found", member.ID)) + } + d.status.Members.UpdateMemberStatus(member, group) + if err := d.updateCRStatus(); err != nil { + log.Debug().Err(err).Msg("Updating CR status failed") + return maskAny(err) + } + return nil +} + +// RemoveMemberByID removes a member with given id. +func (ac *actionContext) RemoveMemberByID(id string) error { + d := ac.deployment + _, group, found := d.status.Members.ElementByID(id) + if !found { + return nil + } + if err := d.status.Members.RemoveByID(id, group); err != nil { + log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to remove member") + return maskAny(err) + } + // Save removed member + if err := d.updateCRStatus(); err != nil { + return maskAny(err) + } + return nil +} + +// DeletePod deletes a pod with given name in the namespace +// of the deployment. If the pod does not exist, the error is ignored. +func (ac *actionContext) DeletePod(podName string) error { + d := ac.deployment + ns := d.apiObject.GetNamespace() + if err := d.deps.KubeCli.Core().Pods(ns).Delete(podName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) { + log.Debug().Err(err).Str("pod", podName).Msg("Failed to remove pod") + return maskAny(err) + } + return nil +} + +// DeletePvc deletes a persistent volume claim with given name in the namespace +// of the deployment. If the pvc does not exist, the error is ignored. +func (ac *actionContext) DeletePvc(pvcName string) error { + d := ac.deployment + ns := d.apiObject.GetNamespace() + if err := d.deps.KubeCli.Core().PersistentVolumeClaims(ns).Delete(pvcName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) { + log.Debug().Err(err).Str("pvc", pvcName).Msg("Failed to remove pvc") + return maskAny(err) + } + return nil +} diff --git a/pkg/deployment/action_remove_member.go b/pkg/deployment/action_remove_member.go new file mode 100644 index 000000000..127f6fda5 --- /dev/null +++ b/pkg/deployment/action_remove_member.go @@ -0,0 +1,80 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/rs/zerolog" +) + +// NewRemoveMemberAction creates a new Action that implements the given +// planned RemoveMember action. +func NewRemoveMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionRemoveMember{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// actionRemoveMember implements an RemoveMemberAction. +type actionRemoveMember struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) { + m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !ok { + // We wanted to remove and it is already gone. All ok + return true, nil + } + // Remove the pod (if any) + if err := a.actionCtx.DeletePod(m.PodName); err != nil { + return false, maskAny(err) + } + // Remove the pvc (if any) + if m.PersistentVolumeClaimName != "" { + if err := a.actionCtx.DeletePvc(m.PersistentVolumeClaimName); err != nil { + return false, maskAny(err) + } + } + // Remove member + if err := a.actionCtx.RemoveMemberByID(a.action.MemberID); err != nil { + return false, maskAny(err) + } + return true, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionRemoveMember) CheckProgress(ctx context.Context) (bool, error) { + // Nothing todo + return true, nil +} diff --git a/pkg/deployment/action_rotate_member.go b/pkg/deployment/action_rotate_member.go new file mode 100644 index 000000000..4fa495023 --- /dev/null +++ b/pkg/deployment/action_rotate_member.go @@ -0,0 +1,117 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/rs/zerolog" +) + +// NewRotateMemberAction creates a new Action that implements the given +// planned RotateMember action. +func NewRotateMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionRotateMember{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// actionRotateMember implements an RotateMember. +type actionRotateMember struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionRotateMember) Start(ctx context.Context) (bool, error) { + log := a.log + group := a.action.Group + m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !ok { + log.Error().Msg("No such member") + } + if group.IsArangod() { + // Invoke shutdown endpoint + c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) + if err != nil { + log.Debug().Err(err).Msg("Failed to create member client") + return false, maskAny(err) + } + removeFromCluster := false + log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") + ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) + defer cancel() + if err := c.Shutdown(ctx, removeFromCluster); err != nil { + // Shutdown failed. Let's check if we're already done + if ready, err := a.CheckProgress(ctx); err == nil && ready { + // We're done + return true, nil + } + log.Debug().Err(err).Msg("Failed to shutdown member") + return false, maskAny(err) + } + } else if group.IsArangosync() { + // Terminate pod + if err := a.actionCtx.DeletePod(m.PodName); err != nil { + return false, maskAny(err) + } + } + // Update status + m.State = api.MemberStateRotating + if err := a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + return false, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, error) { + // Check that pod is removed + log := a.log + m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !found { + log.Error().Msg("No such member") + return true, nil + } + if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { + // Pod is not yet terminated + return false, nil + } + // Pod is terminated, we can now remove it + if err := a.actionCtx.DeletePod(m.PodName); err != nil { + return false, maskAny(err) + } + // Pod is now gone, update the member status + m.State = api.MemberStateNone + if err := a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + return true, nil +} diff --git a/pkg/deployment/action_shutdown_member.go b/pkg/deployment/action_shutdown_member.go new file mode 100644 index 000000000..57dad6f60 --- /dev/null +++ b/pkg/deployment/action_shutdown_member.go @@ -0,0 +1,113 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + "time" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/rs/zerolog" +) + +const ( + shutdownTimeout = time.Second * 15 +) + +// NewShutdownMemberAction creates a new Action that implements the given +// planned ShutdownMember action. +func NewShutdownMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionShutdownMember{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// actionShutdownMember implements an ShutdownMemberAction. +type actionShutdownMember struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) { + log := a.log + group := a.action.Group + m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !ok { + log.Error().Msg("No such member") + return true, nil + } + if group.IsArangod() { + // Invoke shutdown endpoint + c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) + if err != nil { + log.Debug().Err(err).Msg("Failed to create member client") + return false, maskAny(err) + } + removeFromCluster := true + log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") + ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) + defer cancel() + if err := c.Shutdown(ctx, removeFromCluster); err != nil { + // Shutdown failed. Let's check if we're already done + if ready, err := a.CheckProgress(ctx); err == nil && ready { + // We're done + return true, nil + } + log.Debug().Err(err).Msg("Failed to shutdown member") + return false, maskAny(err) + } + } else if group.IsArangosync() { + // Terminate pod + if err := a.actionCtx.DeletePod(m.PodName); err != nil { + return false, maskAny(err) + } + } + // Update status + m.State = api.MemberStateShuttingDown + if err := a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + return false, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, error) { + m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !found { + // Member not long exists + return true, nil + } + if m.Conditions.IsTrue(api.ConditionTypeTerminated) { + // Shutdown completed + return true, nil + } + // Member still not shutdown, retry soon + return false, nil +} diff --git a/pkg/deployment/action_upgrade_member.go b/pkg/deployment/action_upgrade_member.go new file mode 100644 index 000000000..ed4f65ac9 --- /dev/null +++ b/pkg/deployment/action_upgrade_member.go @@ -0,0 +1,127 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/rs/zerolog" +) + +// NewUpgradeMemberAction creates a new Action that implements the given +// planned UpgradeMember action. +func NewUpgradeMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionUpgradeMember{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +// actionUpgradeMember implements an UpgradeMember. +type actionUpgradeMember struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) { + log := a.log + group := a.action.Group + m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !ok { + log.Error().Msg("No such member") + } + // Set AutoUpgrade condition + m.Conditions.Update(api.ConditionTypeAutoUpgrade, true, "Upgrading", "AutoUpgrade on first restart") + if err := a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + if group.IsArangod() { + // Invoke shutdown endpoint + c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID) + if err != nil { + log.Debug().Err(err).Msg("Failed to create member client") + return false, maskAny(err) + } + removeFromCluster := false + log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member") + ctx, cancel := context.WithTimeout(ctx, shutdownTimeout) + defer cancel() + if err := c.Shutdown(ctx, removeFromCluster); err != nil { + // Shutdown failed. Let's check if we're already done + if ready, err := a.CheckProgress(ctx); err == nil && ready { + // We're done + return true, nil + } + log.Debug().Err(err).Msg("Failed to shutdown member") + return false, maskAny(err) + } + } else if group.IsArangosync() { + // Terminate pod + if err := a.actionCtx.DeletePod(m.PodName); err != nil { + return false, maskAny(err) + } + } + // Update status + m.State = api.MemberStateRotating // We keep the rotation state here, since only when a new pod is created, it will get the Upgrading state. + if err := a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + return false, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) { + // Check that pod is removed + log := a.log + m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID) + if !found { + log.Error().Msg("No such member") + return true, nil + } + isUpgrading := m.State == api.MemberStateUpgrading + log = log.With(). + Str("pod-name", m.PodName). + Bool("is-upgrading", isUpgrading).Logger() + if !m.Conditions.IsTrue(api.ConditionTypeTerminated) { + // Pod is not yet terminated + return false, nil + } + // Pod is terminated, we can now remove it + log.Debug().Msg("Deleting pod") + if err := a.actionCtx.DeletePod(m.PodName); err != nil { + return false, maskAny(err) + } + // Pod is now gone, update the member status + m.State = api.MemberStateNone + if err := a.actionCtx.UpdateMember(m); err != nil { + return false, maskAny(err) + } + return isUpgrading, nil +} diff --git a/pkg/deployment/action_wait_for_member_up.go b/pkg/deployment/action_wait_for_member_up.go new file mode 100644 index 000000000..d5dd75aa4 --- /dev/null +++ b/pkg/deployment/action_wait_for_member_up.go @@ -0,0 +1,217 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + "sync" + "time" + + driver "github.com/arangodb/go-driver" + "github.com/rs/zerolog" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" +) + +// NewWaitForMemberUpAction creates a new Action that implements the given +// planned WaitForMemberUp action. +func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action { + return &actionWaitForMemberUp{ + log: log, + action: action, + actionCtx: actionCtx, + } +} + +const ( + maxAgentResponseTime = time.Second * 10 +) + +// actionWaitForMemberUp implements an WaitForMemberUp. +type actionWaitForMemberUp struct { + log zerolog.Logger + action api.Action + actionCtx ActionContext +} + +// Start performs the start of the action. +// Returns true if the action is completely finished, false in case +// the start time needs to be recorded and a ready condition needs to be checked. +func (a *actionWaitForMemberUp) Start(ctx context.Context) (bool, error) { + ready, err := a.CheckProgress(ctx) + if err != nil { + return false, maskAny(err) + } + return ready, nil +} + +// CheckProgress checks the progress of the action. +// Returns true if the action is completely finished, false otherwise. +func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, error) { + if a.action.Group.IsArangosync() { + return a.checkProgressArangoSync(ctx) + } + switch a.actionCtx.GetMode() { + case api.DeploymentModeSingle: + return a.checkProgressSingle(ctx) + default: + if a.action.Group == api.ServerGroupAgents { + return a.checkProgressAgent(ctx) + } + return a.checkProgressCluster(ctx) + } +} + +// checkProgressSingle checks the progress of the action in the case +// of a single server. +func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, error) { + log := a.log + c, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create database client") + return false, maskAny(err) + } + if _, err := c.Version(ctx); err != nil { + log.Debug().Err(err).Msg("Failed to get version") + return false, maskAny(err) + } + return true, nil +} + +type agentStatus struct { + IsLeader bool + LeaderEndpoint string + IsResponding bool +} + +// checkProgressAgent checks the progress of the action in the case +// of an agent. +func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) { + log := a.log + clients, err := a.actionCtx.GetAgencyClients(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create agency clients") + return false, maskAny(err) + } + + wg := sync.WaitGroup{} + invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"} + statuses := make([]agentStatus, len(clients)) + for i, c := range clients { + wg.Add(1) + go func(i int, c arangod.Agency) { + defer wg.Done() + var trash interface{} + lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime) + defer cancel() + if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) { + // We got a valid read from the leader + statuses[i].IsLeader = true + statuses[i].LeaderEndpoint = c.Endpoint() + statuses[i].IsResponding = true + } else { + if location, ok := arangod.IsNotLeader(err); ok { + // Valid response from a follower + statuses[i].IsLeader = false + statuses[i].LeaderEndpoint = location + statuses[i].IsResponding = true + } else { + // Unexpected / invalid response + log.Debug().Err(err).Str("endpoint", c.Endpoint()).Msg("Agent is not responding") + statuses[i].IsResponding = false + } + } + }(i, c) + } + wg.Wait() + + // Check the results + noLeaders := 0 + for i, status := range statuses { + if !status.IsResponding { + log.Debug().Msg("Not all agents are responding") + return false, nil + } + if status.IsLeader { + noLeaders++ + } + if i > 0 { + // Compare leader endpoint with previous + prev := statuses[i-1].LeaderEndpoint + if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) { + log.Debug().Msg("Not all agents report the same leader endpoint") + return false, nil + } + } + } + if noLeaders != 1 { + log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders") + return false, nil + } + + log.Debug(). + Int("leaders", noLeaders). + Int("followers", len(statuses)-noLeaders). + Msg("Agency is happy") + + return true, nil +} + +// checkProgressCluster checks the progress of the action in the case +// of a cluster deployment (coordinator/dbserver). +func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, error) { + log := a.log + c, err := a.actionCtx.GetDatabaseClient(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to create database client") + return false, maskAny(err) + } + cluster, err := c.Cluster(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to access cluster") + return false, maskAny(err) + } + h, err := cluster.Health(ctx) + if err != nil { + log.Debug().Err(err).Msg("Failed to get cluster health") + return false, maskAny(err) + } + sh, found := h.Health[driver.ServerID(a.action.MemberID)] + if !found { + log.Debug().Msg("Member not yet found in cluster health") + return false, nil + } + if sh.Status != driver.ServerStatusGood { + log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good") + return false, nil + } + return true, nil +} + +// checkProgressArangoSync checks the progress of the action in the case +// of a sync master / worker. +func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, error) { + // TODO + return true, nil +} diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index bcf6dc511..e9208b562 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -23,7 +23,6 @@ package deployment import ( - "context" "fmt" "reflect" "time" @@ -89,8 +88,9 @@ type Deployment struct { eventsCli corev1.EventInterface - inspectTrigger trigger.Trigger - clientCache *clientCache + inspectTrigger trigger.Trigger + recentInspectionErrors int + clientCache *clientCache } // New creates a new Deployment from the given API object. @@ -195,7 +195,6 @@ func (d *Deployment) run() { } inspectionInterval := maxInspectionInterval - recentInspectionErrors := 0 for { select { case <-d.stopCh: @@ -218,49 +217,7 @@ func (d *Deployment) run() { } case <-d.inspectTrigger.Done(): - hasError := false - ctx := context.Background() - // Ensure we have image info - if retrySoon, err := d.ensureImages(d.apiObject); err != nil { - hasError = true - d.createEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject)) - } else if retrySoon { - inspectionInterval = minInspectionInterval - } - // Inspection of generated resources needed - if err := d.inspectPods(); err != nil { - hasError = true - d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject)) - } - // Create scale/update plan - if err := d.createPlan(); err != nil { - hasError = true - d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject)) - } - // Execute current step of scale/update plan - if retrySoon, err := d.executePlan(ctx); err != nil { - hasError = true - d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject)) - } else if retrySoon { - inspectionInterval = minInspectionInterval - } - // Ensure all resources are created - if err := d.ensurePVCs(d.apiObject); err != nil { - hasError = true - d.createEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject)) - } - if err := d.ensurePods(d.apiObject); err != nil { - hasError = true - d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject)) - } - if hasError { - if recentInspectionErrors == 0 { - inspectionInterval = minInspectionInterval - recentInspectionErrors++ - } - } else { - recentInspectionErrors = 0 - } + inspectionInterval = d.inspectDeployment(inspectionInterval) case <-time.After(inspectionInterval): // Trigger inspection @@ -341,10 +298,13 @@ func (d *Deployment) createEvent(evt *v1.Event) { } // Update the status of the API object from the internal status -func (d *Deployment) updateCRStatus() error { - if reflect.DeepEqual(d.apiObject.Status, d.status) { - // Nothing has changed - return nil +func (d *Deployment) updateCRStatus(force ...bool) error { + // TODO Remove force.... + if len(force) == 0 || !force[0] { + if reflect.DeepEqual(d.apiObject.Status, d.status) { + // Nothing has changed + return nil + } } // Send update to API server diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go new file mode 100644 index 000000000..63815192e --- /dev/null +++ b/pkg/deployment/deployment_inspector.go @@ -0,0 +1,99 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package deployment + +import ( + "context" + "time" + + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" +) + +// inspectDeployment inspects the entire deployment, creates +// a plan to update if needed and inspects underlying resources. +// This function should be called when: +// - the deployment has changed +// - any of the underlying resources has changed +// - once in a while +// Returns the delay until this function should be called again. +func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration { + // log := d.deps.Log + + nextInterval := lastInterval + hasError := false + ctx := context.Background() + + // Ensure we have image info + if retrySoon, err := d.ensureImages(d.apiObject); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject)) + } else if retrySoon { + nextInterval = minInspectionInterval + } + + // Inspection of generated resources needed + if err := d.inspectPods(); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject)) + } + + // Create scale/update plan + if err := d.createPlan(); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject)) + } + + // Execute current step of scale/update plan + retrySoon, err := d.executePlan(ctx) + if err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject)) + } + if retrySoon { + nextInterval = minInspectionInterval + } + + // Ensure all resources are created + if err := d.ensurePVCs(d.apiObject); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject)) + } + if err := d.ensurePods(d.apiObject); err != nil { + hasError = true + d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject)) + } + + // Update next interval (on errors) + if hasError { + if d.recentInspectionErrors == 0 { + nextInterval = minInspectionInterval + d.recentInspectionErrors++ + } + } else { + d.recentInspectionErrors = 0 + } + if nextInterval > maxInspectionInterval { + nextInterval = maxInspectionInterval + } + return nextInterval +} diff --git a/pkg/deployment/images.go b/pkg/deployment/images.go index 9ce2197fe..517c4cec5 100644 --- a/pkg/deployment/images.go +++ b/pkg/deployment/images.go @@ -29,6 +29,7 @@ import ( "strings" "github.com/rs/zerolog" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -39,6 +40,7 @@ import ( const ( dockerPullableImageIDPrefix = "docker-pullable://" + imageIDAndVersionRole = "id" // Role use by identification pods ) type imagesBuilder struct { @@ -97,9 +99,9 @@ func (ib *imagesBuilder) Run(ctx context.Context) (bool, error) { // When no pod exists, it is created, otherwise the ID is fetched & version detected. // Returns: retrySoon, error func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, image string) (bool, error) { - role := "id" + role := imageIDAndVersionRole id := fmt.Sprintf("%0x", sha1.Sum([]byte(image)))[:6] - podName := k8sutil.CreatePodName(ib.APIObject.GetName(), role, id) + podName := k8sutil.CreatePodName(ib.APIObject.GetName(), role, id, "") ns := ib.APIObject.GetNamespace() log := ib.Log.With(). Str("pod", podName). @@ -166,10 +168,16 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, ima "--server.authentication=false", fmt.Sprintf("--server.endpoint=tcp://[::]:%d", k8sutil.ArangoPort), } - if err := k8sutil.CreateArangodPod(ib.KubeCli, true, ib.APIObject, role, id, "", image, ib.Spec.GetImagePullPolicy(), args, nil, nil, nil, "", ""); err != nil { + if err := k8sutil.CreateArangodPod(ib.KubeCli, true, ib.APIObject, role, id, podName, "", image, ib.Spec.GetImagePullPolicy(), args, nil, nil, nil, "", ""); err != nil { log.Debug().Err(err).Msg("Failed to create image ID pod") return true, maskAny(err) } // Come back soon to inspect the pod return true, nil } + +// isArangoDBImageIDAndVersionPod returns true if the given pod is used for fetching image ID and ArangoDB version of an image +func isArangoDBImageIDAndVersionPod(p v1.Pod) bool { + role, found := p.GetLabels()[k8sutil.LabelKeyRole] + return found && role == imageIDAndVersionRole +} diff --git a/pkg/deployment/members.go b/pkg/deployment/members.go index 13bcd29c4..114af5330 100644 --- a/pkg/deployment/members.go +++ b/pkg/deployment/members.go @@ -65,8 +65,9 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDeployment) error { log := d.deps.Log var id string + idPrefix := getArangodIDPrefix(group) for { - id = strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well + id = idPrefix + strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well if !d.status.Members.ContainsID(id) { break } @@ -82,7 +83,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe ID: id, State: api.MemberStateNone, PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id), - PodName: k8sutil.CreatePodName(deploymentName, role, id), + PodName: "", }); err != nil { return maskAny(err) } @@ -92,7 +93,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe ID: id, State: api.MemberStateNone, PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id), - PodName: k8sutil.CreatePodName(deploymentName, role, id), + PodName: "", }); err != nil { return maskAny(err) } @@ -102,7 +103,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe ID: id, State: api.MemberStateNone, PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id), - PodName: k8sutil.CreatePodName(deploymentName, role, id), + PodName: "", }); err != nil { return maskAny(err) } @@ -112,7 +113,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe ID: id, State: api.MemberStateNone, PersistentVolumeClaimName: "", - PodName: k8sutil.CreatePodName(deploymentName, role, id), + PodName: "", }); err != nil { return maskAny(err) } @@ -122,7 +123,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe ID: id, State: api.MemberStateNone, PersistentVolumeClaimName: "", - PodName: k8sutil.CreatePodName(deploymentName, role, id), + PodName: "", }); err != nil { return maskAny(err) } @@ -132,7 +133,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe ID: id, State: api.MemberStateNone, PersistentVolumeClaimName: "", - PodName: k8sutil.CreatePodName(deploymentName, role, id), + PodName: "", }); err != nil { return maskAny(err) } @@ -142,3 +143,18 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe return nil } + +// getArangodIDPrefix returns the prefix required ID's of arangod servers +// in the given group. +func getArangodIDPrefix(group api.ServerGroup) string { + switch group { + case api.ServerGroupCoordinators: + return "CRDN-" + case api.ServerGroupDBServers: + return "PRMR-" + case api.ServerGroupAgents: + return "AGNT-" + default: + return "" + } +} diff --git a/pkg/deployment/plan_builder.go b/pkg/deployment/plan_builder.go index ce00a391e..adceb7df2 100644 --- a/pkg/deployment/plan_builder.go +++ b/pkg/deployment/plan_builder.go @@ -24,15 +24,39 @@ package deployment import ( api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// upgradeDecision is the result of an upgrade check. +type upgradeDecision struct { + UpgradeNeeded bool // If set, the image version has changed + UpgradeAllowed bool // If set, it is an allowed version change + AutoUpgradeNeeded bool // If set, the database must be started with `--database.auto-upgrade` once +} + // createPlan considers the current specification & status of the deployment creates a plan to // get the status in line with the specification. // If a plan already exists, nothing is done. func (d *Deployment) createPlan() error { + // Get all current pods + pods, err := d.deps.KubeCli.CoreV1().Pods(d.apiObject.GetNamespace()).List(k8sutil.DeploymentListOpt(d.apiObject.GetName())) + if err != nil { + log.Debug().Err(err).Msg("Failed to list pods") + return maskAny(err) + } + myPods := make([]v1.Pod, 0, len(pods.Items)) + for _, p := range pods.Items { + if d.isOwnerOf(&p) { + myPods = append(myPods, p) + } + } + // Create plan - newPlan, changed := createPlan(d.deps.Log, d.status.Plan, d.apiObject.Spec, d.status) + newPlan, changed := createPlan(d.deps.Log, d.apiObject, d.status.Plan, d.apiObject.Spec, d.status, myPods) // If not change, we're done if !changed { @@ -54,7 +78,9 @@ func (d *Deployment) createPlan() error { // createPlan considers the given specification & status and creates a plan to get the status in line with the specification. // If a plan already exists, the given plan is returned with false. // Otherwise the new plan is returned with a boolean true. -func createPlan(log zerolog.Logger, currentPlan api.Plan, spec api.DeploymentSpec, status api.DeploymentStatus) (api.Plan, bool) { +func createPlan(log zerolog.Logger, apiObject metav1.Object, + currentPlan api.Plan, spec api.DeploymentSpec, + status api.DeploymentStatus, pods []v1.Pod) (api.Plan, bool) { if len(currentPlan) > 0 { // Plan already exists, complete that first return currentPlan, false @@ -78,10 +104,118 @@ func createPlan(log zerolog.Logger, currentPlan api.Plan, spec api.DeploymentSpe plan = append(plan, createScalePlan(log, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...) } + // Check for the need to rotate one or more members + getPod := func(podName string) *v1.Pod { + for _, p := range pods { + if p.GetName() == podName { + return &p + } + } + return nil + } + status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error { + for _, m := range *members { + if len(plan) > 0 { + // Only 1 change at a time + continue + } + if m.State != api.MemberStateCreated { + // Only rotate when state is created + continue + } + if podName := m.PodName; podName != "" { + if p := getPod(podName); p != nil { + // Got pod, compare it with what it should be + decision := podNeedsUpgrading(*p, spec, status.Images) + if decision.UpgradeNeeded && decision.UpgradeAllowed { + plan = append(plan, createUpgradeMemberPlan(log, m, group, "Version upgrade")...) + } else { + rotNeeded, reason := podNeedsRotation(*p, apiObject, spec, group, status.Members.Agents, m.ID) + if rotNeeded { + plan = append(plan, createRotateMemberPlan(log, m, group, reason)...) + } + } + } + } + } + return nil + }) + // Return plan return plan, true } +// podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with +// the given spec) and if that is allowed. +func podNeedsUpgrading(p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision { + if len(p.Spec.Containers) == 1 { + c := p.Spec.Containers[0] + specImageInfo, found := images.GetByImage(spec.GetImage()) + if !found { + return upgradeDecision{UpgradeNeeded: false} + } + podImageInfo, found := images.GetByImageID(c.Image) + if !found { + return upgradeDecision{UpgradeNeeded: false} + } + if specImageInfo.ImageID == podImageInfo.ImageID { + // No change + return upgradeDecision{UpgradeNeeded: false} + } + // Image changed, check if change is allowed + specVersion := specImageInfo.ArangoDBVersion + podVersion := podImageInfo.ArangoDBVersion + if specVersion.Major() != podVersion.Major() { + // E.g. 3.x -> 4.x, we cannot allow automatically + return upgradeDecision{UpgradeNeeded: true, UpgradeAllowed: false} + } + if specVersion.Minor() != podVersion.Minor() { + // Is allowed, with `--database.auto-upgrade` + return upgradeDecision{ + UpgradeNeeded: true, + UpgradeAllowed: true, + AutoUpgradeNeeded: true, + } + } + // Patch version change, rotate only + return upgradeDecision{ + UpgradeNeeded: true, + UpgradeAllowed: true, + AutoUpgradeNeeded: false, + } + } + return upgradeDecision{UpgradeNeeded: false} +} + +// podNeedsRotation returns true when the specification of the +// given pod differs from what it should be according to the +// given deployment spec. +// When true is returned, a reason for the rotation is already returned. +func podNeedsRotation(p v1.Pod, apiObject metav1.Object, spec api.DeploymentSpec, + group api.ServerGroup, agents api.MemberStatusList, id string) (bool, string) { + // Check number of containers + if len(p.Spec.Containers) != 1 { + return true, "Number of containers changed" + } + // Check image pull policy + c := p.Spec.Containers[0] + if c.ImagePullPolicy != spec.GetImagePullPolicy() { + return true, "Image pull policy changed" + } + // Check arguments + /*expectedArgs := createArangodArgs(apiObject, spec, group, agents, id) + if len(expectedArgs) != len(c.Args) { + return true, "Arguments changed" + } + for i, a := range expectedArgs { + if c.Args[i] != a { + return true, "Arguments changed" + } + }*/ + + return false, "" +} + // createScalePlan creates a scaling plan for a single server group func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api.ServerGroup, count int) api.Plan { var plan api.Plan @@ -89,7 +223,7 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api // Scale up toAdd := count - len(members) for i := 0; i < toAdd; i++ { - plan = append(plan, api.Action{Type: api.ActionTypeAddMember, Group: group}) + plan = append(plan, api.NewAction(api.ActionTypeAddMember, group, "")) } log.Debug(). Int("delta", toAdd). @@ -100,12 +234,12 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api if m, err := members.SelectMemberToRemove(); err == nil { if group == api.ServerGroupDBServers { plan = append(plan, - api.Action{Type: api.ActionTypeCleanOutMember, Group: group, MemberID: m.ID}, + api.NewAction(api.ActionTypeCleanOutMember, group, m.ID), ) } plan = append(plan, - api.Action{Type: api.ActionTypeShutdownMember, Group: group, MemberID: m.ID}, - api.Action{Type: api.ActionTypeRemoveMember, Group: group, MemberID: m.ID}, + api.NewAction(api.ActionTypeShutdownMember, group, m.ID), + api.NewAction(api.ActionTypeRemoveMember, group, m.ID), ) log.Debug(). Str("role", group.AsRole()). @@ -114,3 +248,33 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api } return plan } + +// createRotateMemberPlan creates a plan to rotate (stop-recreate-start) an existing +// member. +func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus, + group api.ServerGroup, reason string) api.Plan { + log.Debug(). + Str("id", member.ID). + Str("role", group.AsRole()). + Msg("Creating rotation plan") + plan := api.Plan{ + api.NewAction(api.ActionTypeRotateMember, group, member.ID, reason), + api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID), + } + return plan +} + +// createUpgradeMemberPlan creates a plan to upgrade (stop-recreateWithAutoUpgrade-stop-start) an existing +// member. +func createUpgradeMemberPlan(log zerolog.Logger, member api.MemberStatus, + group api.ServerGroup, reason string) api.Plan { + log.Debug(). + Str("id", member.ID). + Str("role", group.AsRole()). + Msg("Creating upgrade plan") + plan := api.Plan{ + api.NewAction(api.ActionTypeUpgradeMember, group, member.ID, reason), + api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID), + } + return plan +} diff --git a/pkg/deployment/plan_builder_test.go b/pkg/deployment/plan_builder_test.go index f16e295a7..4a2e89146 100644 --- a/pkg/deployment/plan_builder_test.go +++ b/pkg/deployment/plan_builder_test.go @@ -28,6 +28,7 @@ import ( "github.com/rs/zerolog" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/arangodb/kube-arangodb/pkg/util" @@ -40,10 +41,17 @@ func TestCreatePlanSingleScale(t *testing.T) { XMode: api.NewMode(api.DeploymentModeSingle), } spec.SetDefaults("test") + depl := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_depl", + Namespace: "test", + }, + Spec: spec, + } // Test with empty status var status api.DeploymentStatus - newPlan, changed := createPlan(log, nil, spec, status) + newPlan, changed := createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) assert.Len(t, newPlan, 0) // Single mode does not scale @@ -54,7 +62,7 @@ func TestCreatePlanSingleScale(t *testing.T) { PodName: "something", }, } - newPlan, changed = createPlan(log, nil, spec, status) + newPlan, changed = createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) assert.Len(t, newPlan, 0) // Single mode does not scale @@ -69,7 +77,7 @@ func TestCreatePlanSingleScale(t *testing.T) { PodName: "something1", }, } - newPlan, changed = createPlan(log, nil, spec, status) + newPlan, changed = createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) assert.Len(t, newPlan, 0) // Single mode does not scale } @@ -82,10 +90,17 @@ func TestCreatePlanResilientSingleScale(t *testing.T) { } spec.SetDefaults("test") spec.Single.XCount = util.NewInt(2) + depl := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_depl", + Namespace: "test", + }, + Spec: spec, + } // Test with empty status var status api.DeploymentStatus - newPlan, changed := createPlan(log, nil, spec, status) + newPlan, changed := createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) require.Len(t, newPlan, 2) assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type) @@ -98,7 +113,7 @@ func TestCreatePlanResilientSingleScale(t *testing.T) { PodName: "something", }, } - newPlan, changed = createPlan(log, nil, spec, status) + newPlan, changed = createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) require.Len(t, newPlan, 1) assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type) @@ -123,7 +138,7 @@ func TestCreatePlanResilientSingleScale(t *testing.T) { PodName: "something4", }, } - newPlan, changed = createPlan(log, nil, spec, status) + newPlan, changed = createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) require.Len(t, newPlan, 2) // Note: Downscaling is only down 1 at a time assert.Equal(t, api.ActionTypeShutdownMember, newPlan[0].Type) @@ -139,10 +154,17 @@ func TestCreatePlanClusterScale(t *testing.T) { XMode: api.NewMode(api.DeploymentModeCluster), } spec.SetDefaults("test") + depl := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test_depl", + Namespace: "test", + }, + Spec: spec, + } // Test with empty status var status api.DeploymentStatus - newPlan, changed := createPlan(log, nil, spec, status) + newPlan, changed := createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) require.Len(t, newPlan, 6) // Adding 3 dbservers & 3 coordinators (note: agents do not scale now) assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type) @@ -175,7 +197,7 @@ func TestCreatePlanClusterScale(t *testing.T) { PodName: "coordinator1", }, } - newPlan, changed = createPlan(log, nil, spec, status) + newPlan, changed = createPlan(log, depl, nil, spec, status, nil) assert.True(t, changed) require.Len(t, newPlan, 3) assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type) @@ -212,7 +234,7 @@ func TestCreatePlanClusterScale(t *testing.T) { } spec.DBServers.XCount = util.NewInt(1) spec.Coordinators.XCount = util.NewInt(1) - newPlan, changed = createPlan(log, nil, spec, status) + newPlan, changed = createPlan(log, nil, spec, status, nil) assert.True(t, changed) require.Len(t, newPlan, 5) // Note: Downscaling is done 1 at a time assert.Equal(t, api.ActionTypeCleanOutMember, newPlan[0].Type) diff --git a/pkg/deployment/plan_executor.go b/pkg/deployment/plan_executor.go index 884bbe123..137e03082 100644 --- a/pkg/deployment/plan_executor.go +++ b/pkg/deployment/plan_executor.go @@ -29,7 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" - "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/rs/zerolog" ) // executePlan tries to execute the plan as far as possible. @@ -37,21 +37,29 @@ import ( // False otherwise. func (d *Deployment) executePlan(ctx context.Context) (bool, error) { log := d.deps.Log + initialPlanLen := len(d.status.Plan) for { if len(d.status.Plan) == 0 { // No plan exists, nothing to be done - return false, nil + return initialPlanLen > 0, nil } // Take first action - action := d.status.Plan[0] - if action.StartTime.IsZero() { + planAction := d.status.Plan[0] + log := log.With(). + Int("plan-len", len(d.status.Plan)). + Str("action-id", planAction.ID). + Str("action-type", string(planAction.Type)). + Str("group", planAction.Group.AsRole()). + Str("member-id", planAction.MemberID). + Logger() + action := d.createAction(ctx, log, planAction) + if planAction.StartTime.IsZero() { // Not started yet - ready, err := d.startAction(ctx, action) + ready, err := action.Start(ctx) if err != nil { log.Debug().Err(err). - Str("action-type", string(action.Type)). Msg("Failed to start action") return false, maskAny(err) } @@ -64,9 +72,11 @@ func (d *Deployment) executePlan(ctx context.Context) (bool, error) { d.status.Plan[0].StartTime = &now } // Save plan update - if err := d.updateCRStatus(); err != nil { + if err := d.updateCRStatus(true); err != nil { + log.Debug().Err(err).Msg("Failed to update CR status") return false, maskAny(err) } + log.Debug().Bool("ready", ready).Msg("Action Start completed") if !ready { // We need to check back soon return true, nil @@ -74,23 +84,25 @@ func (d *Deployment) executePlan(ctx context.Context) (bool, error) { // Continue with next action } else { // First action of plan has been started, check its progress - ready, err := d.checkActionProgress(ctx, action) + ready, err := action.CheckProgress(ctx) if err != nil { - log.Debug().Err(err). - Str("action-type", string(action.Type)). - Msg("Failed to check action progress") + log.Debug().Err(err).Msg("Failed to check action progress") return false, maskAny(err) } + if ready { + // Remove action from list + d.status.Plan = d.status.Plan[1:] + // Save plan update + if err := d.updateCRStatus(); err != nil { + log.Debug().Err(err).Msg("Failed to update CR status") + return false, maskAny(err) + } + } + log.Debug().Bool("ready", ready).Msg("Action CheckProgress completed") if !ready { // Not ready check, come back soon return true, nil } - // Remove action from list - d.status.Plan = d.status.Plan[1:] - // Save plan update - if err := d.updateCRStatus(); err != nil { - return false, maskAny(err) - } // Continue with next action } } @@ -99,152 +111,24 @@ func (d *Deployment) executePlan(ctx context.Context) (bool, error) { // startAction performs the start of the given action // Returns true if the action is completely finished, false in case // the start time needs to be recorded and a ready condition needs to be checked. -func (d *Deployment) startAction(ctx context.Context, action api.Action) (bool, error) { - log := d.deps.Log - ns := d.apiObject.GetNamespace() - +func (d *Deployment) createAction(ctx context.Context, log zerolog.Logger, action api.Action) Action { + actionCtx := NewActionContext(log, d) switch action.Type { case api.ActionTypeAddMember: - if err := d.createMember(action.Group, d.apiObject); err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member") - return false, maskAny(err) - } - // Save added member - if err := d.updateCRStatus(); err != nil { - return false, maskAny(err) - } - return true, nil + return NewAddMemberAction(log, action, actionCtx) case api.ActionTypeRemoveMember: - m, _, ok := d.status.Members.ElementByID(action.MemberID) - if !ok { - // We wanted to remove and it is already gone. All ok - return true, nil - } - // Remove the pod (if any) - if err := d.deps.KubeCli.Core().Pods(ns).Delete(m.PodName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) { - log.Debug().Err(err).Str("pod", m.PodName).Msg("Failed to remove pod") - return false, maskAny(err) - } - // Remove the pvc (if any) - if m.PersistentVolumeClaimName != "" { - if err := d.deps.KubeCli.Core().PersistentVolumeClaims(ns).Delete(m.PersistentVolumeClaimName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) { - log.Debug().Err(err).Str("pod", m.PodName).Msg("Failed to remove pvc") - return false, maskAny(err) - } - } - // Remove member - if err := d.status.Members.RemoveByID(action.MemberID, action.Group); err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to remove member") - return false, maskAny(err) - } - // Save removed member - if err := d.updateCRStatus(); err != nil { - return false, maskAny(err) - } - return true, nil + return NewRemoveMemberAction(log, action, actionCtx) case api.ActionTypeCleanOutMember: - m, ok := d.status.Members.DBServers.ElementByID(action.MemberID) - if !ok { - log.Error().Str("group", action.Group.AsRole()).Str("id", action.MemberID).Msg("No such member") - return true, nil - } - c, err := d.clientCache.GetDatabase(ctx) - if err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member client") - return false, maskAny(err) - } - cluster, err := c.Cluster(ctx) - if err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to access cluster") - return false, maskAny(err) - } - if err := cluster.CleanOutServer(ctx, action.MemberID); err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to cleanout member") - return false, maskAny(err) - } - // Update status - m.State = api.MemberStateCleanOut - if err := d.updateCRStatus(); err != nil { - return false, maskAny(err) - } - return true, nil + return NewCleanOutMemberAction(log, action, actionCtx) case api.ActionTypeShutdownMember: - m, _, ok := d.status.Members.ElementByID(action.MemberID) - if !ok { - log.Error().Str("group", action.Group.AsRole()).Str("id", action.MemberID).Msg("No such member") - return true, nil - } - if action.Group.IsArangod() { - // Invoke shutdown endpoint - c, err := d.clientCache.Get(ctx, action.Group, action.MemberID) - if err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member client") - return false, maskAny(err) - } - if err := c.Shutdown(ctx, true); err != nil { - log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to shutdown member") - return false, maskAny(err) - } - } else if action.Group.IsArangosync() { - // Terminate pod - if err := d.deps.KubeCli.Core().Pods(ns).Delete(m.PodName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) { - log.Debug().Err(err).Str("pod", m.PodName).Msg("Failed to remove pod") - return false, maskAny(err) - } - } - // Update status - m.State = api.MemberStateShuttingDown - if err := d.updateCRStatus(); err != nil { - return false, maskAny(err) - } - return true, nil + return NewShutdownMemberAction(log, action, actionCtx) + case api.ActionTypeRotateMember: + return NewRotateMemberAction(log, action, actionCtx) + case api.ActionTypeUpgradeMember: + return NewUpgradeMemberAction(log, action, actionCtx) + case api.ActionTypeWaitForMemberUp: + return NewWaitForMemberUpAction(log, action, actionCtx) default: - return false, maskAny(fmt.Errorf("Unknown action type")) - } -} - -// checkActionProgress checks the progress of the given action. -// Returns true if the action is completely finished, false otherwise. -func (d *Deployment) checkActionProgress(ctx context.Context, action api.Action) (bool, error) { - switch action.Type { - case api.ActionTypeAddMember: - // Nothing todo - return true, nil - case api.ActionTypeRemoveMember: - // Nothing todo - return true, nil - case api.ActionTypeCleanOutMember: - c, err := d.clientCache.GetDatabase(ctx) - if err != nil { - return false, maskAny(err) - } - cluster, err := c.Cluster(ctx) - if err != nil { - return false, maskAny(err) - } - cleanedOut, err := cluster.IsCleanedOut(ctx, action.MemberID) - if err != nil { - return false, maskAny(err) - } - if !cleanedOut { - // We're not done yet - return false, nil - } - // Cleanout completed - return true, nil - case api.ActionTypeShutdownMember: - m, _, ok := d.status.Members.ElementByID(action.MemberID) - if !ok { - // Member not long exists - return true, nil - } - if m.Conditions.IsTrue(api.ConditionTypeTerminated) { - // Shutdown completed - return true, nil - } - // Member still not shutdown, retry soon - return false, nil - default: - return false, maskAny(fmt.Errorf("Unknown action type")) + panic(fmt.Sprintf("Unknown action type '%s'", action.Type)) } } diff --git a/pkg/deployment/pod_creator.go b/pkg/deployment/pod_creator.go index 305c6a5b0..a60b25ca3 100644 --- a/pkg/deployment/pod_creator.go +++ b/pkg/deployment/pod_creator.go @@ -23,6 +23,8 @@ package deployment import ( + "crypto/sha1" + "encoding/json" "fmt" "net" "path/filepath" @@ -55,8 +57,10 @@ func (o optionPair) CompareTo(other optionPair) int { } // createArangodArgs creates command line arguments for an arangod server in the given group. -func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, svrSpec api.ServerGroupSpec, agents api.MemberStatusList, id string) []string { +func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, + agents api.MemberStatusList, id string, autoUpgrade bool) []string { options := make([]optionPair, 0, 64) + svrSpec := deplSpec.GetServerGroupSpec(group) // Endpoint listenAddr := "[::]" @@ -123,6 +127,14 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro optionPair{"--database.directory", k8sutil.ArangodVolumeMountDir}, optionPair{"--log.output", "+"}, ) + + // Auto upgrade? + if autoUpgrade { + options = append(options, + optionPair{"--database.auto-upgrade", "true"}, + ) + } + /* if config.ServerThreads != 0 { options = append(options, optionPair{"--server.threads", strconv.Itoa(config.ServerThreads)}) @@ -136,7 +148,7 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro switch group { case api.ServerGroupAgents: options = append(options, - optionPair{"--cluster.my-id", id}, + optionPair{"--agency.disaster-recovery-id", id}, optionPair{"--agency.activate", "true"}, optionPair{"--agency.my-address", myTCPURL}, optionPair{"--agency.size", strconv.Itoa(deplSpec.Agents.GetCount())}, @@ -152,15 +164,9 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro ) } } - /*if agentRecoveryID != "" { - options = append(options, - optionPair{"--agency.disaster-recovery-id", agentRecoveryID}, - ) - }*/ case api.ServerGroupDBServers: addAgentEndpoints = true options = append(options, - optionPair{"--cluster.my-id", id}, optionPair{"--cluster.my-address", myTCPURL}, optionPair{"--cluster.my-role", "PRIMARY"}, optionPair{"--foxx.queues", "false"}, @@ -169,7 +175,6 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro case api.ServerGroupCoordinators: addAgentEndpoints = true options = append(options, - optionPair{"--cluster.my-id", id}, optionPair{"--cluster.my-address", myTCPURL}, optionPair{"--cluster.my-role", "COORDINATOR"}, optionPair{"--foxx.queues", "true"}, @@ -184,7 +189,6 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro addAgentEndpoints = true options = append(options, optionPair{"--replication.automatic-failover", "true"}, - optionPair{"--cluster.my-id", id}, optionPair{"--cluster.my-address", myTCPURL}, optionPair{"--cluster.my-role", "SINGLE"}, ) @@ -310,8 +314,13 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error { if m.State != api.MemberStateNone { continue } - // Create pod + // Update pod name role := group.AsRole() + roleAbbr := group.AsRoleAbbreviated() + podSuffix := createPodSuffix(apiObject.Spec) + m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, podSuffix) + newState := api.MemberStateCreated + // Create pod if group.IsArangod() { // Find image ID info, found := apiObject.Status.Images.GetByImage(apiObject.Spec.GetImage()) @@ -320,7 +329,11 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error { return nil } // Prepare arguments - args := createArangodArgs(apiObject, apiObject.Spec, group, spec, d.status.Members.Agents, m.ID) + autoUpgrade := m.Conditions.IsTrue(api.ConditionTypeAutoUpgrade) + if autoUpgrade { + newState = api.MemberStateUpgrading + } + args := createArangodArgs(apiObject, apiObject.Spec, group, d.status.Members.Agents, m.ID, autoUpgrade) env := make(map[string]k8sutil.EnvValue) livenessProbe, err := d.createLivenessProbe(apiObject, group) if err != nil { @@ -355,7 +368,7 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error { SecretKey: constants.SecretKeyJWT, } } - if err := k8sutil.CreateArangodPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PersistentVolumeClaimName, info.ImageID, apiObject.Spec.GetImagePullPolicy(), args, env, livenessProbe, readinessProbe, tlsKeyfileSecretName, rocksdbEncryptionSecretName); err != nil { + if err := k8sutil.CreateArangodPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, m.PersistentVolumeClaimName, info.ImageID, apiObject.Spec.GetImagePullPolicy(), args, env, livenessProbe, readinessProbe, tlsKeyfileSecretName, rocksdbEncryptionSecretName); err != nil { return maskAny(err) } } else if group.IsArangosync() { @@ -376,12 +389,15 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error { if group == api.ServerGroupSyncWorkers { affinityWithRole = api.ServerGroupDBServers.AsRole() } - if err := k8sutil.CreateArangoSyncPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, info.ImageID, apiObject.Spec.Sync.GetImagePullPolicy(), args, env, livenessProbe, affinityWithRole); err != nil { + if err := k8sutil.CreateArangoSyncPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, info.ImageID, apiObject.Spec.Sync.GetImagePullPolicy(), args, env, livenessProbe, affinityWithRole); err != nil { return maskAny(err) } } // Record new member state - m.State = api.MemberStateCreated + m.State = newState + m.Conditions.Remove(api.ConditionTypeReady) + m.Conditions.Remove(api.ConditionTypeTerminated) + m.Conditions.Remove(api.ConditionTypeAutoUpgrade) if err := status.Update(m); err != nil { return maskAny(err) } @@ -397,3 +413,9 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error { } return nil } + +func createPodSuffix(spec api.DeploymentSpec) string { + raw, _ := json.Marshal(spec) + hash := sha1.Sum(raw) + return fmt.Sprintf("%0x", hash)[:6] +} diff --git a/pkg/deployment/pod_creator_agent_args_test.go b/pkg/deployment/pod_creator_agent_args_test.go index 97562121b..2e1204199 100644 --- a/pkg/deployment/pod_creator_agent_args_test.go +++ b/pkg/deployment/pod_creator_agent_args_test.go @@ -51,16 +51,60 @@ func TestCreateArangodArgsAgent(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false) assert.Equal(t, []string{ "--agency.activate=true", + "--agency.disaster-recovery-id=a1", "--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529", "--agency.size=3", "--agency.supervision=true", - "--cluster.my-id=a1", + "--database.directory=/data", + "--foxx.queues=false", + "--log.level=INFO", + "--log.output=+", + "--server.authentication=true", + "--server.endpoint=ssl://[::]:8529", + "--server.jwt-secret=$(ARANGOD_JWT_SECRET)", + "--server.statistics=false", + "--server.storage-engine=rocksdb", + "--ssl.ecdh-curve=", + "--ssl.keyfile=/secrets/tls/tls.keyfile", + }, + cmdline, + ) + } + + // Default+AutoUpgrade deployment + { + apiObject := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + }, + Spec: api.DeploymentSpec{ + Mode: api.DeploymentModeCluster, + }, + } + apiObject.Spec.SetDefaults("test") + agents := api.MemberStatusList{ + api.MemberStatus{ID: "a1"}, + api.MemberStatus{ID: "a2"}, + api.MemberStatus{ID: "a3"}, + } + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", true) + assert.Equal(t, + []string{ + "--agency.activate=true", + "--agency.disaster-recovery-id=a1", + "--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", + "--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", + "--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529", + "--agency.size=3", + "--agency.supervision=true", + "--database.auto-upgrade=true", "--database.directory=/data", "--foxx.queues=false", "--log.level=INFO", @@ -97,16 +141,16 @@ func TestCreateArangodArgsAgent(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false) assert.Equal(t, []string{ "--agency.activate=true", + "--agency.disaster-recovery-id=a1", "--agency.endpoint=tcp://name-agent-a2.name-int.ns.svc:8529", "--agency.endpoint=tcp://name-agent-a3.name-int.ns.svc:8529", "--agency.my-address=tcp://name-agent-a1.name-int.ns.svc:8529", "--agency.size=3", "--agency.supervision=true", - "--cluster.my-id=a1", "--database.directory=/data", "--foxx.queues=false", "--log.level=INFO", @@ -140,16 +184,16 @@ func TestCreateArangodArgsAgent(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false) assert.Equal(t, []string{ "--agency.activate=true", + "--agency.disaster-recovery-id=a1", "--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529", "--agency.size=3", "--agency.supervision=true", - "--cluster.my-id=a1", "--database.directory=/data", "--foxx.queues=false", "--log.level=INFO", @@ -183,16 +227,16 @@ func TestCreateArangodArgsAgent(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false) assert.Equal(t, []string{ "--agency.activate=true", + "--agency.disaster-recovery-id=a1", "--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529", "--agency.size=3", "--agency.supervision=true", - "--cluster.my-id=a1", "--database.directory=/data", "--foxx.queues=false", "--log.level=INFO", diff --git a/pkg/deployment/pod_creator_coordinator_args_test.go b/pkg/deployment/pod_creator_coordinator_args_test.go index 9995b95d7..6c1b3e451 100644 --- a/pkg/deployment/pod_creator_coordinator_args_test.go +++ b/pkg/deployment/pod_creator_coordinator_args_test.go @@ -51,14 +51,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=COORDINATOR", "--database.directory=/data", "--foxx.queues=true", @@ -76,6 +75,48 @@ func TestCreateArangodArgsCoordinator(t *testing.T) { ) } + // Default+AutoUpgrade deployment + { + apiObject := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + }, + Spec: api.DeploymentSpec{ + Mode: api.DeploymentModeCluster, + }, + } + apiObject.Spec.SetDefaults("test") + agents := api.MemberStatusList{ + api.MemberStatus{ID: "a1"}, + api.MemberStatus{ID: "a2"}, + api.MemberStatus{ID: "a3"}, + } + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", true) + assert.Equal(t, + []string{ + "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", + "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", + "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", + "--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529", + "--cluster.my-role=COORDINATOR", + "--database.auto-upgrade=true", + "--database.directory=/data", + "--foxx.queues=true", + "--log.level=INFO", + "--log.output=+", + "--server.authentication=true", + "--server.endpoint=ssl://[::]:8529", + "--server.jwt-secret=$(ARANGOD_JWT_SECRET)", + "--server.statistics=true", + "--server.storage-engine=rocksdb", + "--ssl.ecdh-curve=", + "--ssl.keyfile=/secrets/tls/tls.keyfile", + }, + cmdline, + ) + } + // Default+TLS disabled deployment { apiObject := &api.ArangoDeployment{ @@ -96,14 +137,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=tcp://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=tcp://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=tcp://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=tcp://name-coordinator-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=COORDINATOR", "--database.directory=/data", "--foxx.queues=true", @@ -137,14 +177,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=COORDINATOR", "--database.directory=/data", "--foxx.queues=true", @@ -180,14 +219,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=COORDINATOR", "--database.directory=/data", "--foxx.queues=true", diff --git a/pkg/deployment/pod_creator_dbserver_args_test.go b/pkg/deployment/pod_creator_dbserver_args_test.go index 6b2c654f0..e4bf2ba98 100644 --- a/pkg/deployment/pod_creator_dbserver_args_test.go +++ b/pkg/deployment/pod_creator_dbserver_args_test.go @@ -51,14 +51,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=PRIMARY", "--database.directory=/data", "--foxx.queues=false", @@ -76,6 +75,48 @@ func TestCreateArangodArgsDBServer(t *testing.T) { ) } + // Default+AutoUpgrade deployment + { + apiObject := &api.ArangoDeployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "name", + Namespace: "ns", + }, + Spec: api.DeploymentSpec{ + Mode: api.DeploymentModeCluster, + }, + } + apiObject.Spec.SetDefaults("test") + agents := api.MemberStatusList{ + api.MemberStatus{ID: "a1"}, + api.MemberStatus{ID: "a2"}, + api.MemberStatus{ID: "a3"}, + } + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", true) + assert.Equal(t, + []string{ + "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", + "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", + "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", + "--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529", + "--cluster.my-role=PRIMARY", + "--database.auto-upgrade=true", + "--database.directory=/data", + "--foxx.queues=false", + "--log.level=INFO", + "--log.output=+", + "--server.authentication=true", + "--server.endpoint=ssl://[::]:8529", + "--server.jwt-secret=$(ARANGOD_JWT_SECRET)", + "--server.statistics=true", + "--server.storage-engine=rocksdb", + "--ssl.ecdh-curve=", + "--ssl.keyfile=/secrets/tls/tls.keyfile", + }, + cmdline, + ) + } + // Default+TLS disabled deployment { apiObject := &api.ArangoDeployment{ @@ -96,14 +137,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=tcp://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=tcp://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=tcp://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=tcp://name-dbserver-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=PRIMARY", "--database.directory=/data", "--foxx.queues=false", @@ -137,14 +177,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=PRIMARY", "--database.directory=/data", "--foxx.queues=false", @@ -180,14 +219,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=PRIMARY", "--database.directory=/data", "--foxx.queues=false", diff --git a/pkg/deployment/pod_creator_single_args_test.go b/pkg/deployment/pod_creator_single_args_test.go index 481226fb7..7d7a56127 100644 --- a/pkg/deployment/pod_creator_single_args_test.go +++ b/pkg/deployment/pod_creator_single_args_test.go @@ -42,7 +42,7 @@ func TestCreateArangodArgsSingle(t *testing.T) { }, } apiObject.Spec.SetDefaults("test") - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false) assert.Equal(t, []string{ "--database.directory=/data", @@ -61,6 +61,34 @@ func TestCreateArangodArgsSingle(t *testing.T) { ) } + // Default+AutoUpgrade deployment + { + apiObject := &api.ArangoDeployment{ + Spec: api.DeploymentSpec{ + Mode: api.DeploymentModeSingle, + }, + } + apiObject.Spec.SetDefaults("test") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", true) + assert.Equal(t, + []string{ + "--database.auto-upgrade=true", + "--database.directory=/data", + "--foxx.queues=true", + "--log.level=INFO", + "--log.output=+", + "--server.authentication=true", + "--server.endpoint=ssl://[::]:8529", + "--server.jwt-secret=$(ARANGOD_JWT_SECRET)", + "--server.statistics=true", + "--server.storage-engine=rocksdb", + "--ssl.ecdh-curve=", + "--ssl.keyfile=/secrets/tls/tls.keyfile", + }, + cmdline, + ) + } + // Default+TLS disabled deployment { apiObject := &api.ArangoDeployment{ @@ -72,7 +100,7 @@ func TestCreateArangodArgsSingle(t *testing.T) { }, } apiObject.Spec.SetDefaults("test") - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false) assert.Equal(t, []string{ "--database.directory=/data", @@ -98,7 +126,7 @@ func TestCreateArangodArgsSingle(t *testing.T) { }, } apiObject.Spec.SetDefaults("test") - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false) assert.Equal(t, []string{ "--database.directory=/data", @@ -126,7 +154,7 @@ func TestCreateArangodArgsSingle(t *testing.T) { } apiObject.Spec.Authentication.XJWTSecretName = util.NewString("None") apiObject.Spec.SetDefaults("test") - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false) assert.Equal(t, []string{ "--database.directory=/data", @@ -153,7 +181,7 @@ func TestCreateArangodArgsSingle(t *testing.T) { } apiObject.Spec.Single.Args = []string{"--foo1", "--foo2"} apiObject.Spec.SetDefaults("test") - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false) assert.Equal(t, []string{ "--database.directory=/data", @@ -191,14 +219,13 @@ func TestCreateArangodArgsSingle(t *testing.T) { api.MemberStatus{ID: "a2"}, api.MemberStatus{ID: "a3"}, } - cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, agents, "id1") + cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, agents, "id1", false) assert.Equal(t, []string{ "--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529", "--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529", "--cluster.my-address=ssl://name-single-id1.name-int.ns.svc:8529", - "--cluster.my-id=id1", "--cluster.my-role=SINGLE", "--database.directory=/data", "--foxx.queues=true", diff --git a/pkg/deployment/pod_inspector.go b/pkg/deployment/pod_inspector.go index 20d03845e..ac0456c89 100644 --- a/pkg/deployment/pod_inspector.go +++ b/pkg/deployment/pod_inspector.go @@ -53,6 +53,10 @@ func (d *Deployment) inspectPods() error { log.Debug().Str("pod", p.GetName()).Msg("pod not owned by this deployment") continue } + if isArangoDBImageIDAndVersionPod(p) { + // Image ID pods are not relevant to inspect here + continue + } // Pod belongs to this deployment, update metric inspectedPodCounter.Inc() @@ -113,7 +117,7 @@ func (d *Deployment) inspectPods() error { switch m.State { case api.MemberStateNone: // Do nothing - case api.MemberStateShuttingDown: + case api.MemberStateShuttingDown, api.MemberStateRotating, api.MemberStateUpgrading: // Shutdown was intended, so not need to do anything here. // Just mark terminated if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") { diff --git a/pkg/logging/logger.go b/pkg/logging/logger.go index 2932f0cd9..663b4aa2e 100644 --- a/pkg/logging/logger.go +++ b/pkg/logging/logger.go @@ -35,7 +35,8 @@ var ( // The defaultLevels list is used during development to increase the // default level for components that we care a little less about. defaultLevels = map[string]string{ - //"something.status": "info", + "operator": "info", + //"something.status": "info", } ) diff --git a/pkg/operator/crd.go b/pkg/operator/crd.go index d325dbaab..0b20f3cac 100644 --- a/pkg/operator/crd.go +++ b/pkg/operator/crd.go @@ -31,7 +31,7 @@ import ( // waitForCRD waits for the CustomResourceDefinition (created externally) // to be ready. func (o *Operator) waitForCRD(enableDeployment, enableStorage bool) error { - log := o.Dependencies.Log + log := o.log if enableDeployment { log.Debug().Msg("Waiting for ArangoDeployment CRD to be ready") diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 90f9051be..13d21b504 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -37,6 +37,7 @@ import ( lsapi "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha" "github.com/arangodb/kube-arangodb/pkg/deployment" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" + "github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/storage" ) @@ -54,6 +55,7 @@ type Operator struct { Config Dependencies + log zerolog.Logger deployments map[string]*deployment.Deployment localStorages map[string]*storage.LocalStorage } @@ -69,7 +71,7 @@ type Config struct { } type Dependencies struct { - Log zerolog.Logger + LogService logging.Service KubeCli kubernetes.Interface KubeExtCli apiextensionsclient.Interface CRCli versioned.Interface @@ -81,6 +83,7 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) { o := &Operator{ Config: config, Dependencies: deps, + log: deps.LogService.MustGetLogger("operator"), deployments: make(map[string]*deployment.Deployment), localStorages: make(map[string]*storage.LocalStorage), } diff --git a/pkg/operator/operator_deployment.go b/pkg/operator/operator_deployment.go index c8a405613..408538a22 100644 --- a/pkg/operator/operator_deployment.go +++ b/pkg/operator/operator_deployment.go @@ -63,9 +63,8 @@ func (o *Operator) runDeployments(stop <-chan struct{}) { // onAddArangoDeployment deployment addition callback func (o *Operator) onAddArangoDeployment(obj interface{}) { - log := o.Dependencies.Log apiObject := obj.(*api.ArangoDeployment) - log.Debug(). + o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). Msg("ArangoDeployment added") o.syncArangoDeployment(apiObject) @@ -73,9 +72,8 @@ func (o *Operator) onAddArangoDeployment(obj interface{}) { // onUpdateArangoDeployment deployment update callback func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) { - log := o.Dependencies.Log apiObject := newObj.(*api.ArangoDeployment) - log.Debug(). + o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). Msg("ArangoDeployment updated") o.syncArangoDeployment(apiObject) @@ -83,7 +81,7 @@ func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) { // onDeleteArangoDeployment deployment delete callback func (o *Operator) onDeleteArangoDeployment(obj interface{}) { - log := o.Dependencies.Log + log := o.log apiObject, ok := obj.(*api.ArangoDeployment) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -129,7 +127,7 @@ func (o *Operator) syncArangoDeployment(apiObject *api.ArangoDeployment) { //pt.start() err := o.handleDeploymentEvent(ev) if err != nil { - o.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event") + o.log.Warn().Err(err).Msg("Failed to handle event") } //pt.stop() } @@ -197,8 +195,7 @@ func (o *Operator) makeDeploymentConfigAndDeps(apiObject *api.ArangoDeployment) ServiceAccount: o.Config.ServiceAccount, } deps := deployment.Dependencies{ - Log: o.Dependencies.Log.With(). - Str("component", "deployment"). + Log: o.Dependencies.LogService.MustGetLogger("deployment").With(). Str("deployment", apiObject.GetName()). Logger(), KubeCli: o.Dependencies.KubeCli, diff --git a/pkg/operator/operator_leader.go b/pkg/operator/operator_leader.go index 4c8eb625c..5d94c2058 100644 --- a/pkg/operator/operator_leader.go +++ b/pkg/operator/operator_leader.go @@ -32,7 +32,7 @@ import ( func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) { namespace := o.Config.Namespace kubecli := o.Dependencies.KubeCli - log := o.Dependencies.Log.With().Str("lock-name", lockName).Logger() + log := o.log.With().Str("lock-name", lockName).Logger() rl, err := resourcelock.New(resourcelock.EndpointsResourceLock, namespace, lockName, diff --git a/pkg/operator/operator_local_storage.go b/pkg/operator/operator_local_storage.go index 2672e78d7..d8e9138c1 100644 --- a/pkg/operator/operator_local_storage.go +++ b/pkg/operator/operator_local_storage.go @@ -63,9 +63,8 @@ func (o *Operator) runLocalStorages(stop <-chan struct{}) { // onAddArangoLocalStorage local storage addition callback func (o *Operator) onAddArangoLocalStorage(obj interface{}) { - log := o.Dependencies.Log apiObject := obj.(*api.ArangoLocalStorage) - log.Debug(). + o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). Msg("ArangoLocalStorage added") o.syncArangoLocalStorage(apiObject) @@ -73,9 +72,8 @@ func (o *Operator) onAddArangoLocalStorage(obj interface{}) { // onUpdateArangoLocalStorage local storage update callback func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) { - log := o.Dependencies.Log apiObject := newObj.(*api.ArangoLocalStorage) - log.Debug(). + o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). Msg("ArangoLocalStorage updated") o.syncArangoLocalStorage(apiObject) @@ -83,7 +81,7 @@ func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) { // onDeleteArangoLocalStorage local storage delete callback func (o *Operator) onDeleteArangoLocalStorage(obj interface{}) { - log := o.Dependencies.Log + log := o.log apiObject, ok := obj.(*api.ArangoLocalStorage) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -129,7 +127,7 @@ func (o *Operator) syncArangoLocalStorage(apiObject *api.ArangoLocalStorage) { //pt.start() err := o.handleLocalStorageEvent(ev) if err != nil { - o.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event") + o.log.Warn().Err(err).Msg("Failed to handle event") } //pt.stop() } @@ -199,8 +197,7 @@ func (o *Operator) makeLocalStorageConfigAndDeps(apiObject *api.ArangoLocalStora ServiceAccount: o.Config.ServiceAccount, } deps := storage.Dependencies{ - Log: o.Dependencies.Log.With(). - Str("component", "storage"). + Log: o.Dependencies.LogService.MustGetLogger("storage").With(). Str("localStorage", apiObject.GetName()). Logger(), KubeCli: o.Dependencies.KubeCli, diff --git a/pkg/util/arangod/agency.go b/pkg/util/arangod/agency.go new file mode 100644 index 000000000..0e1f71148 --- /dev/null +++ b/pkg/util/arangod/agency.go @@ -0,0 +1,147 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + driver "github.com/arangodb/go-driver" + "github.com/pkg/errors" +) + +// Agency provides API implemented by the ArangoDB agency. +type Agency interface { + // ReadKey reads the value of a given key in the agency. + ReadKey(ctx context.Context, key []string, value interface{}) error + // Endpoint returns the endpoint of this agent connection + Endpoint() string +} + +// NewAgencyClient creates a new Agency connection from the given client +// connection. +// The number of endpoints of the client must be exactly 1. +func NewAgencyClient(c driver.Client) (Agency, error) { + if len(c.Connection().Endpoints()) > 1 { + return nil, maskAny(fmt.Errorf("Got multiple endpoints")) + } + return &agency{ + conn: c.Connection(), + }, nil +} + +type agency struct { + conn driver.Connection +} + +// ReadKey reads the value of a given key in the agency. +func (a *agency) ReadKey(ctx context.Context, key []string, value interface{}) error { + conn := a.conn + req, err := conn.NewRequest("POST", "_api/agency/read") + if err != nil { + return maskAny(err) + } + fullKey := createFullKey(key) + input := [][]string{{fullKey}} + req, err = req.SetBody(input) + if err != nil { + return maskAny(err) + } + //var raw []byte + //ctx = driver.WithRawResponse(ctx, &raw) + resp, err := conn.Do(ctx, req) + if err != nil { + fmt.Printf("conn.Do failed, err=%v, resp=%#v\n", err, resp) + return maskAny(err) + } + if resp.StatusCode() == 307 { + // Not leader + location := resp.Header("Location") + return NotLeaderError{Leader: location} + } + if err := resp.CheckStatus(200, 201, 202); err != nil { + return maskAny(err) + } + //fmt.Printf("Agent response: %s\n", string(raw)) + elems, err := resp.ParseArrayBody() + if err != nil { + return maskAny(err) + } + if len(elems) != 1 { + return maskAny(fmt.Errorf("Expected 1 element, got %d", len(elems))) + } + // If empty key parse directly + if len(key) == 0 { + if err := elems[0].ParseBody("", &value); err != nil { + return maskAny(err) + } + } else { + // Now remove all wrapping objects for each key element + var rawObject map[string]interface{} + if err := elems[0].ParseBody("", &rawObject); err != nil { + return maskAny(err) + } + var rawMsg interface{} + for keyIndex := 0; keyIndex < len(key); keyIndex++ { + if keyIndex > 0 { + var ok bool + rawObject, ok = rawMsg.(map[string]interface{}) + if !ok { + return maskAny(fmt.Errorf("Data is not an object at key %s", key[:keyIndex+1])) + } + } + var found bool + rawMsg, found = rawObject[key[keyIndex]] + if !found { + return errors.Wrapf(KeyNotFoundError, "Missing data at key %s", key[:keyIndex+1]) + } + } + // Encode to json ... + encoded, err := json.Marshal(rawMsg) + if err != nil { + return maskAny(err) + } + // and decode back into result + if err := json.Unmarshal(encoded, &value); err != nil { + return maskAny(err) + } + } + + // fmt.Printf("result as JSON: %s\n", rawResult) + return nil +} + +// Endpoint returns the endpoint of this agent connection +func (a *agency) Endpoint() string { + ep := a.conn.Endpoints() + if len(ep) == 0 { + return "" + } + return ep[0] +} + +func createFullKey(key []string) string { + return "/" + strings.Join(key, "/") +} diff --git a/pkg/util/arangod/client.go b/pkg/util/arangod/client.go index acb8fb81f..5645d327d 100644 --- a/pkg/util/arangod/client.go +++ b/pkg/util/arangod/client.go @@ -129,8 +129,9 @@ func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interfa transport = sharedHTTPSTransport } connConfig := http.ConnectionConfig{ - Endpoints: []string{scheme + "://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))}, - Transport: transport, + Endpoints: []string{scheme + "://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))}, + Transport: transport, + DontFollowRedirect: true, } // TODO deal with TLS with proper CA checking conn, err := http.NewConnection(connConfig) diff --git a/pkg/util/arangod/endpoint.go b/pkg/util/arangod/endpoint.go new file mode 100644 index 000000000..d5f2d0641 --- /dev/null +++ b/pkg/util/arangod/endpoint.go @@ -0,0 +1,42 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import "net/url" + +// IsSameEndpoint returns true when the 2 given endpoints +// refer to the same server. +func IsSameEndpoint(a, b string) bool { + if a == b { + return true + } + ua, err := url.Parse(a) + if err != nil { + return false + } + ub, err := url.Parse(b) + if err != nil { + return false + } + return ua.Hostname() == ub.Hostname() +} diff --git a/pkg/util/arangod/error.go b/pkg/util/arangod/error.go index 7ef7f5e00..b87e57ac4 100644 --- a/pkg/util/arangod/error.go +++ b/pkg/util/arangod/error.go @@ -25,5 +25,32 @@ package arangod import "github.com/pkg/errors" var ( + KeyNotFoundError = errors.New("Key not found") + maskAny = errors.WithStack ) + +// IsKeyNotFound returns true if the given error is (or is caused by) a KeyNotFoundError. +func IsKeyNotFound(err error) bool { + return errors.Cause(err) == KeyNotFoundError +} + +// NotLeaderError indicates the response of an agent when it is +// not the leader of the agency. +type NotLeaderError struct { + Leader string // Endpoint of the current leader +} + +// Error implements error. +func (e NotLeaderError) Error() string { + return "not the leader" +} + +// IsNotLeader returns true if the given error is (or is caused by) a NotLeaderError. +func IsNotLeader(err error) (string, bool) { + nlErr, ok := err.(NotLeaderError) + if ok { + return nlErr.Leader, true + } + return "", false +} diff --git a/pkg/util/k8sutil/dns.go b/pkg/util/k8sutil/dns.go index ce7d147a2..2fb4e7171 100644 --- a/pkg/util/k8sutil/dns.go +++ b/pkg/util/k8sutil/dns.go @@ -29,7 +29,7 @@ import ( // CreatePodDNSName returns the DNS of a pod with a given role & id in // a given deployment. func CreatePodDNSName(deployment metav1.Object, role, id string) string { - return CreatePodName(deployment.GetName(), role, id) + "." + + return CreatePodHostName(deployment.GetName(), role, id) + "." + CreateHeadlessServiceName(deployment.GetName()) + "." + deployment.GetNamespace() + ".svc" } diff --git a/pkg/util/k8sutil/names.go b/pkg/util/k8sutil/names.go index 6474a2988..d75836352 100644 --- a/pkg/util/k8sutil/names.go +++ b/pkg/util/k8sutil/names.go @@ -25,10 +25,12 @@ package k8sutil import ( "fmt" "regexp" + "strings" ) var ( - resourceNameRE = regexp.MustCompile(`^([0-9\-\.a-z])+$`) + resourceNameRE = regexp.MustCompile(`^([0-9\-\.a-z])+$`) + arangodPrefixes = []string{"CRDN-", "PRMR-", "AGNT-"} ) // ValidateOptionalResourceName validates a kubernetes resource name. @@ -55,3 +57,13 @@ func ValidateResourceName(name string) error { } return maskAny(fmt.Errorf("Name '%s' is not a valid resource name", name)) } + +// stripArangodPrefix removes well know arangod ID prefixes from the given id. +func stripArangodPrefix(id string) string { + for _, prefix := range arangodPrefixes { + if strings.HasPrefix(id, prefix) { + return id[len(prefix):] + } + } + return id +} diff --git a/pkg/util/k8sutil/pods.go b/pkg/util/k8sutil/pods.go index 79a95e9bc..295c57c75 100644 --- a/pkg/util/k8sutil/pods.go +++ b/pkg/util/k8sutil/pods.go @@ -23,12 +23,16 @@ package k8sutil import ( + "fmt" + "path/filepath" + "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" ) const ( + alpineImage = "alpine" arangodVolumeName = "arangod-data" tlsKeyfileVolumeName = "tls-keyfile" rocksdbEncryptionVolumeName = "rocksdb-encryption" @@ -96,14 +100,23 @@ func getPodCondition(status *v1.PodStatus, condType v1.PodConditionType) *v1.Pod // CreatePodName returns the name of the pod for a member with // a given id in a deployment with a given name. -func CreatePodName(deploymentName, role, id string) string { - return deploymentName + "-" + role + "-" + id +func CreatePodName(deploymentName, role, id, suffix string) string { + if len(suffix) > 0 && suffix[0] != '-' { + suffix = "-" + suffix + } + return CreatePodHostName(deploymentName, role, id) + suffix +} + +// CreatePodHostName returns the hostname of the pod for a member with +// a given id in a deployment with a given name. +func CreatePodHostName(deploymentName, role, id string) string { + return deploymentName + "-" + role + "-" + stripArangodPrefix(id) } // CreateTLSKeyfileSecretName returns the name of the Secret that holds the TLS keyfile for a member with // a given id in a deployment with a given name. func CreateTLSKeyfileSecretName(deploymentName, role, id string) string { - return CreatePodName(deploymentName, role, id) + "-tls-keyfile" + return CreatePodName(deploymentName, role, id, "-tls-keyfile") } // arangodVolumeMounts creates a volume mount structure for arangod. @@ -133,6 +146,23 @@ func rocksdbEncryptionVolumeMounts() []v1.VolumeMount { } } +// arangodInitContainer creates a container configured to +// initalize a UUID file. +func arangodInitContainer(name, id string) v1.Container { + uuidFile := filepath.Join(ArangodVolumeMountDir, "UUID") + c := v1.Container{ + Command: []string{ + "/bin/sh", + "-c", + fmt.Sprintf("test -f %s || echo '%s' > %s", uuidFile, id, uuidFile), + }, + Name: name, + Image: alpineImage, + VolumeMounts: arangodVolumeMounts(), + } + return c +} + // arangodContainer creates a container configured to run `arangod`. func arangodContainer(name string, image string, imagePullPolicy v1.PullPolicy, args []string, env map[string]EnvValue, livenessProbe *HTTPProbeConfig, readinessProbe *HTTPProbeConfig) v1.Container { c := v1.Container{ @@ -188,17 +218,17 @@ func arangosyncContainer(name string, image string, imagePullPolicy v1.PullPolic } // newPod creates a basic Pod for given settings. -func newPod(deploymentName, ns, role, id string) v1.Pod { - name := CreatePodName(deploymentName, role, id) +func newPod(deploymentName, ns, role, id, podName string) v1.Pod { + hostname := CreatePodHostName(deploymentName, role, id) p := v1.Pod{ ObjectMeta: metav1.ObjectMeta{ - Name: name, + Name: podName, Labels: LabelsForDeployment(deploymentName, role), }, Spec: v1.PodSpec{ - Hostname: name, + Hostname: hostname, Subdomain: CreateHeadlessServiceName(deploymentName), - RestartPolicy: v1.RestartPolicyOnFailure, + RestartPolicy: v1.RestartPolicyNever, }, } return p @@ -208,12 +238,12 @@ func newPod(deploymentName, ns, role, id string) v1.Pod { // If the pod already exists, nil is returned. // If another error occurs, that error is returned. func CreateArangodPod(kubecli kubernetes.Interface, developmentMode bool, deployment APIObject, - role, id, pvcName, image string, imagePullPolicy v1.PullPolicy, + role, id, podName, pvcName, image string, imagePullPolicy v1.PullPolicy, args []string, env map[string]EnvValue, livenessProbe *HTTPProbeConfig, readinessProbe *HTTPProbeConfig, tlsKeyfileSecretName, rocksdbEncryptionSecretName string) error { // Prepare basic pod - p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id) + p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id, podName) // Add arangod container c := arangodContainer("arangod", image, imagePullPolicy, args, env, livenessProbe, readinessProbe) @@ -225,6 +255,9 @@ func CreateArangodPod(kubecli kubernetes.Interface, developmentMode bool, deploy } p.Spec.Containers = append(p.Spec.Containers, c) + // Add UUID init container + p.Spec.InitContainers = append(p.Spec.InitContainers, arangodInitContainer("uuid", id)) + // Add volume if pvcName != "" { // Create PVC @@ -286,10 +319,10 @@ func CreateArangodPod(kubecli kubernetes.Interface, developmentMode bool, deploy // CreateArangoSyncPod creates a Pod that runs `arangosync`. // If the pod already exists, nil is returned. // If another error occurs, that error is returned. -func CreateArangoSyncPod(kubecli kubernetes.Interface, developmentMode bool, deployment APIObject, role, id, image string, imagePullPolicy v1.PullPolicy, +func CreateArangoSyncPod(kubecli kubernetes.Interface, developmentMode bool, deployment APIObject, role, id, podName, image string, imagePullPolicy v1.PullPolicy, args []string, env map[string]EnvValue, livenessProbe *HTTPProbeConfig, affinityWithRole string) error { // Prepare basic pod - p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id) + p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id, podName) // Add arangosync container c := arangosyncContainer("arangosync", image, imagePullPolicy, args, env, livenessProbe) diff --git a/pkg/util/k8sutil/pvc.go b/pkg/util/k8sutil/pvc.go index d5f038186..cfed48981 100644 --- a/pkg/util/k8sutil/pvc.go +++ b/pkg/util/k8sutil/pvc.go @@ -31,7 +31,7 @@ import ( // CreatePersistentVolumeClaimName returns the name of the persistent volume claim for a member with // a given id in a deployment with a given name. func CreatePersistentVolumeClaimName(deploymentName, role, id string) string { - return deploymentName + "-" + role + "-" + id + return deploymentName + "-" + role + "-" + stripArangodPrefix(id) } // CreatePersistentVolumeClaim creates a persistent volume claim with given name and configuration. diff --git a/pkg/util/k8sutil/util.go b/pkg/util/k8sutil/util.go index c8753f9b2..ac1273fab 100644 --- a/pkg/util/k8sutil/util.go +++ b/pkg/util/k8sutil/util.go @@ -27,6 +27,20 @@ import ( "k8s.io/apimachinery/pkg/labels" ) +const ( + // LabelKeyArangoDeployment is the key of the label used to store the ArangoDeployment name in + LabelKeyArangoDeployment = "arango_deployment" + // LabelKeyArangoLocalStorage is the key of the label used to store the ArangoLocalStorage name in + LabelKeyArangoLocalStorage = "arango_local_storage" + // LabelKeyApp is the key of the label used to store the application name in (fixed to AppName) + LabelKeyApp = "app" + // LabelKeyRole is the key of the label used to store the role of the resource in + LabelKeyRole = "role" + + // AppName is the fixed value for the "app" label + AppName = "arangodb" +) + // addOwnerRefToObject adds given owner reference to given object func addOwnerRefToObject(obj metav1.Object, ownerRef *metav1.OwnerReference) { if ownerRef != nil { @@ -37,11 +51,11 @@ func addOwnerRefToObject(obj metav1.Object, ownerRef *metav1.OwnerReference) { // LabelsForDeployment returns a map of labels, given to all resources for given deployment name func LabelsForDeployment(deploymentName, role string) map[string]string { l := map[string]string{ - "arango_deployment": deploymentName, - "app": "arangodb", + LabelKeyArangoDeployment: deploymentName, + LabelKeyApp: AppName, } if role != "" { - l["role"] = role + l[LabelKeyRole] = role } return l } @@ -49,11 +63,11 @@ func LabelsForDeployment(deploymentName, role string) map[string]string { // LabelsForLocalStorage returns a map of labels, given to all resources for given local storage name func LabelsForLocalStorage(localStorageName, role string) map[string]string { l := map[string]string{ - "arango_local_storage": localStorageName, - "app": "arangodb", + LabelKeyArangoLocalStorage: localStorageName, + LabelKeyApp: AppName, } if role != "" { - l["role"] = role + l[LabelKeyRole] = role } return l } diff --git a/tools/manifests/manifest_builder.go b/tools/manifests/manifest_builder.go index 9ab7766d5..043e7dd2e 100644 --- a/tools/manifests/manifest_builder.go +++ b/tools/manifests/manifest_builder.go @@ -177,11 +177,12 @@ func main() { } // Save output - outputPath, err := filepath.Abs(filepath.Join("manifests", "arango-"+group+options.OutputSuffix+".yaml")) + outputDir, err := filepath.Abs("manifests") if err != nil { - log.Fatalf("Failed to get absolute output path: %v\n", err) + log.Fatalf("Failed to get absolute output dir: %v\n", err) } - if err := os.MkdirAll(filepath.Base(outputPath), 0755); err != nil { + outputPath := filepath.Join(outputDir, "arango-"+group+options.OutputSuffix+".yaml") + if err := os.MkdirAll(outputDir, 0755); err != nil { log.Fatalf("Failed to create output directory: %v\n", err) } if err := ioutil.WriteFile(outputPath, output.Bytes(), 0644); err != nil {