1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Merged in master

This commit is contained in:
Ewout Prangsma 2018-03-23 09:36:20 +01:00
commit 6599663f08
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
49 changed files with 2100 additions and 330 deletions

View file

@ -67,6 +67,9 @@ type ConnectionConfig struct {
// directly after use, resulting in a large number of connections in `TIME_WAIT` state.
// When this value is not set, the driver will set it to 64 automatically.
Transport http.RoundTripper
// DontFollowRedirect; if set, redirect will not be followed, response from the initial request will be returned without an error
// DontFollowRedirect takes precendance over FailOnRedirect.
DontFollowRedirect bool
// FailOnRedirect; if set, redirect will not be followed, instead the status code is returned as error
FailOnRedirect bool
// Cluster configuration settings
@ -137,7 +140,11 @@ func newHTTPConnection(endpoint string, config ConnectionConfig) (driver.Connect
httpClient := &http.Client{
Transport: config.Transport,
}
if config.FailOnRedirect {
if config.DontFollowRedirect {
httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse // Do not wrap, standard library will not understand
}
} else if config.FailOnRedirect {
httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return driver.ArangoError{
HasError: true,

View file

@ -7,6 +7,7 @@ spec:
app: arangodb
role: coordinator
type: NodePort
publishNotReadyAddresses: true
ports:
- protocol: TCP
port: 8529

View file

@ -4,3 +4,9 @@ metadata:
name: "example-simple-cluster"
spec:
mode: cluster
image: arangodb/arangodb:3.3.4
tls:
altNames: ["kube-01", "kube-02", "kube-03"]
coordinators:
args:
- --log.level=true

View file

@ -186,7 +186,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper
CreateCRD: operatorOptions.createCRD,
}
deps := operator.Dependencies{
Log: logService.MustGetLogger("operator"),
LogService: logService,
KubeCli: kubecli,
KubeExtCli: kubeExtCli,
CRCli: crCli,

View file

@ -35,6 +35,8 @@ const (
ConditionTypeReady ConditionType = "Ready"
// ConditionTypeTerminated indicates that the member has terminated and will not restart.
ConditionTypeTerminated ConditionType = "Terminated"
// ConditionTypeAutoUpgrade indicates that the member has to be started with `--database.auto-upgrade` once.
ConditionTypeAutoUpgrade ConditionType = "AutoUpgrade"
)
// Condition represents one current condition of a deployment or deployment member.

View file

@ -99,6 +99,27 @@ func (s DeploymentSpec) IsSecure() bool {
return s.TLS.IsSecure()
}
// GetServerGroupSpec returns the server group spec (from this
// deployment spec) for the given group.
func (s DeploymentSpec) GetServerGroupSpec(group ServerGroup) ServerGroupSpec {
switch group {
case ServerGroupSingle:
return s.Single
case ServerGroupAgents:
return s.Agents
case ServerGroupDBServers:
return s.DBServers
case ServerGroupCoordinators:
return s.Coordinators
case ServerGroupSyncMasters:
return s.SyncMasters
case ServerGroupSyncWorkers:
return s.SyncWorkers
default:
return ServerGroupSpec{}
}
}
// SetDefaults fills in default values when a field is not specified.
func (s *DeploymentSpec) SetDefaults(deploymentName string) {
if s.GetMode() == "" {

View file

@ -34,4 +34,8 @@ const (
MemberStateCleanOut MemberState = "CleanOut"
// MemberStateShuttingDown indicates that a member is shutting down
MemberStateShuttingDown MemberState = "ShuttingDown"
// MemberStateRotating indicates that a member is being rotated
MemberStateRotating MemberState = "Rotating"
// MemberStateUpgrading indicates that a member is in the process of upgrading its database data format
MemberStateUpgrading MemberState = "Upgrading"
)

View file

@ -22,7 +22,10 @@
package v1alpha
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
"github.com/dchest/uniuri"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// ActionType is a strongly typed name for a plan action item
type ActionType string
@ -36,18 +39,45 @@ const (
ActionTypeCleanOutMember ActionType = "CleanOutMember"
// ActionTypeShutdownMember causes a member to be shutdown and removed from the cluster.
ActionTypeShutdownMember ActionType = "ShutdownMember"
// ActionTypeRotateMember causes a member to be shutdown and have it's pod removed.
ActionTypeRotateMember ActionType = "RotateMember"
// ActionTypeUpgradeMember causes a member to be shutdown and have it's pod removed, restarted with AutoUpgrade option, waited until termination and the restarted again.
ActionTypeUpgradeMember ActionType = "UpgradeMember"
// ActionTypeWaitForMemberUp causes the plan to wait until the member is considered "up".
ActionTypeWaitForMemberUp ActionType = "WaitForMemberUp"
)
// Action represents a single action to be taken to update a deployment.
type Action struct {
// ID of this action (unique for every action)
ID string `json:"id"`
// Type of action.
Type ActionType `json:"type"`
// ID reference of the member involved in this action (if any)
MemberID string `json:"memberID,omitempty"`
// Group involved in this action
Group ServerGroup `json:"group,omitempty"`
// CreationTime is set the when the action is created.
CreationTime metav1.Time `json:"creationTime"`
// StartTime is set the when the action has been started, but needs to wait to be finished.
StartTime *metav1.Time `json:"startTime,omitempty"`
// Reason for this action
Reason string `json:"reason,omitempty"`
}
// NewAction instantiates a new Action.
func NewAction(actionType ActionType, group ServerGroup, memberID string, reason ...string) Action {
a := Action{
ID: uniuri.New(),
Type: actionType,
MemberID: memberID,
Group: group,
CreationTime: metav1.Now(),
}
if len(reason) != 0 {
a.Reason = reason[0]
}
return a
}
// Plan is a list of actions that will be taken to update a deployment.

View file

@ -65,6 +65,26 @@ func (g ServerGroup) AsRole() string {
}
}
// AsRoleAbbreviated returns the abbreviation of the "role" value for the given group.
func (g ServerGroup) AsRoleAbbreviated() string {
switch g {
case ServerGroupSingle:
return "sngl"
case ServerGroupAgents:
return "agnt"
case ServerGroupDBServers:
return "prmr"
case ServerGroupCoordinators:
return "crdn"
case ServerGroupSyncMasters:
return "syma"
case ServerGroupSyncWorkers:
return "sywo"
default:
return "?"
}
}
// IsArangod returns true when the groups runs servers of type `arangod`.
func (g ServerGroup) IsArangod() bool {
switch g {

View file

@ -37,6 +37,15 @@ func TestServerGroupAsRole(t *testing.T) {
assert.Equal(t, "syncworker", ServerGroupSyncWorkers.AsRole())
}
func TestServerGroupAsRoleAbbreviated(t *testing.T) {
assert.Equal(t, "sngl", ServerGroupSingle.AsRoleAbbreviated())
assert.Equal(t, "agnt", ServerGroupAgents.AsRoleAbbreviated())
assert.Equal(t, "prmr", ServerGroupDBServers.AsRoleAbbreviated())
assert.Equal(t, "crdn", ServerGroupCoordinators.AsRoleAbbreviated())
assert.Equal(t, "syma", ServerGroupSyncMasters.AsRoleAbbreviated())
assert.Equal(t, "sywo", ServerGroupSyncWorkers.AsRoleAbbreviated())
}
func TestServerGroupIsArangod(t *testing.T) {
assert.True(t, ServerGroupSingle.IsArangod())
assert.True(t, ServerGroupAgents.IsArangod())

View file

@ -35,6 +35,7 @@ import (
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Action) DeepCopyInto(out *Action) {
*out = *in
in.CreationTime.DeepCopyInto(&out.CreationTime)
if in.StartTime != nil {
in, out := &in.StartTime, &out.StartTime
if *in == nil {

38
pkg/deployment/action.go Normal file
View file

@ -0,0 +1,38 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
)
// Action executes a single Plan item.
type Action interface {
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
Start(ctx context.Context) (bool, error)
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
CheckProgress(ctx context.Context) (bool, error)
}

View file

@ -0,0 +1,66 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// NewAddMemberAction creates a new Action that implements the given
// planned AddMember action.
func NewAddMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionAddMember{
log: log,
action: action,
actionCtx: actionCtx,
}
}
// actionAddMember implements an AddMemberAction.
type actionAddMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionAddMember) Start(ctx context.Context) (bool, error) {
if err := a.actionCtx.CreateMember(a.action.Group); err != nil {
log.Debug().Err(err).Msg("Failed to create member")
return false, maskAny(err)
}
return true, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionAddMember) CheckProgress(ctx context.Context) (bool, error) {
// Nothing todo
return true, nil
}

View file

@ -0,0 +1,105 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
)
// NewCleanOutMemberAction creates a new Action that implements the given
// planned CleanOutMember action.
func NewCleanOutMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionCleanoutMember{
log: log,
action: action,
actionCtx: actionCtx,
}
}
// actionCleanoutMember implements an CleanOutMemberAction.
type actionCleanoutMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionCleanoutMember) Start(ctx context.Context) (bool, error) {
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
// We wanted to remove and it is already gone. All ok
return true, nil
}
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, maskAny(err)
}
cluster, err := c.Cluster(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to access cluster")
return false, maskAny(err)
}
if err := cluster.CleanOutServer(ctx, a.action.MemberID); err != nil {
log.Debug().Err(err).Msg("Failed to cleanout member")
return false, maskAny(err)
}
// Update status
m.State = api.MemberStateCleanOut
if a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
return false, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, maskAny(err)
}
cluster, err := c.Cluster(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to access cluster")
return false, maskAny(err)
}
cleanedOut, err := cluster.IsCleanedOut(ctx, a.action.MemberID)
if err != nil {
return false, maskAny(err)
}
if !cleanedOut {
// We're not done yet
return false, nil
}
// Cleanout completed
return true, nil
}

View file

@ -0,0 +1,205 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"fmt"
driver "github.com/arangodb/go-driver"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// ActionContext provides methods to the Action implementations
// to control their context.
type ActionContext interface {
// Gets the specified mode of deployment
GetMode() api.DeploymentMode
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
GetDatabaseClient(ctx context.Context) (driver.Client, error)
// GetServerClient returns a cached client for a specific server.
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]arangod.Agency, error)
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
// when no such member is found.
GetMemberStatusByID(id string) (api.MemberStatus, bool)
// CreateMember adds a new member to the given group.
CreateMember(group api.ServerGroup) error
// UpdateMember updates the deployment status wrt the given member.
UpdateMember(member api.MemberStatus) error
// RemoveMemberByID removes a member with given id.
RemoveMemberByID(id string) error
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(podName string) error
// DeletePvc deletes a persistent volume claim with given name in the namespace
// of the deployment. If the pvc does not exist, the error is ignored.
DeletePvc(pvcName string) error
}
// NewActionContext creates a new ActionContext implementation.
func NewActionContext(log zerolog.Logger, deployment *Deployment) ActionContext {
return &actionContext{
log: log,
deployment: deployment,
}
}
// actionContext implements ActionContext
type actionContext struct {
log zerolog.Logger
deployment *Deployment
}
// Gets the specified mode of deployment
func (ac *actionContext) GetMode() api.DeploymentMode {
return ac.deployment.apiObject.Spec.GetMode()
}
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
func (ac *actionContext) GetDatabaseClient(ctx context.Context) (driver.Client, error) {
c, err := ac.deployment.clientCache.GetDatabase(ctx)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}
// GetServerClient returns a cached client for a specific server.
func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) {
c, err := ac.deployment.clientCache.Get(ctx, group, id)
if err != nil {
return nil, maskAny(err)
}
return c, nil
}
// GetAgencyClients returns a client connection for every agency member.
func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) {
agencyMembers := ac.deployment.status.Members.Agents
result := make([]arangod.Agency, 0, len(agencyMembers))
for _, m := range agencyMembers {
client, err := ac.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
if err != nil {
return nil, maskAny(err)
}
aClient, err := arangod.NewAgencyClient(client)
if err != nil {
return nil, maskAny(err)
}
result = append(result, aClient)
}
return result, nil
}
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
// when no such member is found.
func (ac *actionContext) GetMemberStatusByID(id string) (api.MemberStatus, bool) {
m, _, ok := ac.deployment.status.Members.ElementByID(id)
return m, ok
}
// CreateMember adds a new member to the given group.
func (ac *actionContext) CreateMember(group api.ServerGroup) error {
d := ac.deployment
if err := d.createMember(group, d.apiObject); err != nil {
ac.log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
return maskAny(err)
}
// Save added member
if err := d.updateCRStatus(); err != nil {
log.Debug().Err(err).Msg("Updating CR status failed")
return maskAny(err)
}
return nil
}
// UpdateMember updates the deployment status wrt the given member.
func (ac *actionContext) UpdateMember(member api.MemberStatus) error {
d := ac.deployment
_, group, found := ac.deployment.status.Members.ElementByID(member.ID)
if !found {
return maskAny(fmt.Errorf("Member %s not found", member.ID))
}
d.status.Members.UpdateMemberStatus(member, group)
if err := d.updateCRStatus(); err != nil {
log.Debug().Err(err).Msg("Updating CR status failed")
return maskAny(err)
}
return nil
}
// RemoveMemberByID removes a member with given id.
func (ac *actionContext) RemoveMemberByID(id string) error {
d := ac.deployment
_, group, found := d.status.Members.ElementByID(id)
if !found {
return nil
}
if err := d.status.Members.RemoveByID(id, group); err != nil {
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to remove member")
return maskAny(err)
}
// Save removed member
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
return nil
}
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
func (ac *actionContext) DeletePod(podName string) error {
d := ac.deployment
ns := d.apiObject.GetNamespace()
if err := d.deps.KubeCli.Core().Pods(ns).Delete(podName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pod", podName).Msg("Failed to remove pod")
return maskAny(err)
}
return nil
}
// DeletePvc deletes a persistent volume claim with given name in the namespace
// of the deployment. If the pvc does not exist, the error is ignored.
func (ac *actionContext) DeletePvc(pvcName string) error {
d := ac.deployment
ns := d.apiObject.GetNamespace()
if err := d.deps.KubeCli.Core().PersistentVolumeClaims(ns).Delete(pvcName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pvc", pvcName).Msg("Failed to remove pvc")
return maskAny(err)
}
return nil
}

View file

@ -0,0 +1,80 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
)
// NewRemoveMemberAction creates a new Action that implements the given
// planned RemoveMember action.
func NewRemoveMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionRemoveMember{
log: log,
action: action,
actionCtx: actionCtx,
}
}
// actionRemoveMember implements an RemoveMemberAction.
type actionRemoveMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionRemoveMember) Start(ctx context.Context) (bool, error) {
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
// We wanted to remove and it is already gone. All ok
return true, nil
}
// Remove the pod (if any)
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, maskAny(err)
}
// Remove the pvc (if any)
if m.PersistentVolumeClaimName != "" {
if err := a.actionCtx.DeletePvc(m.PersistentVolumeClaimName); err != nil {
return false, maskAny(err)
}
}
// Remove member
if err := a.actionCtx.RemoveMemberByID(a.action.MemberID); err != nil {
return false, maskAny(err)
}
return true, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionRemoveMember) CheckProgress(ctx context.Context) (bool, error) {
// Nothing todo
return true, nil
}

View file

@ -0,0 +1,117 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
)
// NewRotateMemberAction creates a new Action that implements the given
// planned RotateMember action.
func NewRotateMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionRotateMember{
log: log,
action: action,
actionCtx: actionCtx,
}
}
// actionRotateMember implements an RotateMember.
type actionRotateMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionRotateMember) Start(ctx context.Context) (bool, error) {
log := a.log
group := a.action.Group
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
}
if group.IsArangod() {
// Invoke shutdown endpoint
c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, maskAny(err)
}
removeFromCluster := false
log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member")
ctx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
if err := c.Shutdown(ctx, removeFromCluster); err != nil {
// Shutdown failed. Let's check if we're already done
if ready, err := a.CheckProgress(ctx); err == nil && ready {
// We're done
return true, nil
}
log.Debug().Err(err).Msg("Failed to shutdown member")
return false, maskAny(err)
}
} else if group.IsArangosync() {
// Terminate pod
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, maskAny(err)
}
}
// Update status
m.State = api.MemberStateRotating
if err := a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
return false, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, error) {
// Check that pod is removed
log := a.log
m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !found {
log.Error().Msg("No such member")
return true, nil
}
if !m.Conditions.IsTrue(api.ConditionTypeTerminated) {
// Pod is not yet terminated
return false, nil
}
// Pod is terminated, we can now remove it
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, maskAny(err)
}
// Pod is now gone, update the member status
m.State = api.MemberStateNone
if err := a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
return true, nil
}

View file

@ -0,0 +1,113 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
)
const (
shutdownTimeout = time.Second * 15
)
// NewShutdownMemberAction creates a new Action that implements the given
// planned ShutdownMember action.
func NewShutdownMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionShutdownMember{
log: log,
action: action,
actionCtx: actionCtx,
}
}
// actionShutdownMember implements an ShutdownMemberAction.
type actionShutdownMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionShutdownMember) Start(ctx context.Context) (bool, error) {
log := a.log
group := a.action.Group
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
return true, nil
}
if group.IsArangod() {
// Invoke shutdown endpoint
c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, maskAny(err)
}
removeFromCluster := true
log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member")
ctx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
if err := c.Shutdown(ctx, removeFromCluster); err != nil {
// Shutdown failed. Let's check if we're already done
if ready, err := a.CheckProgress(ctx); err == nil && ready {
// We're done
return true, nil
}
log.Debug().Err(err).Msg("Failed to shutdown member")
return false, maskAny(err)
}
} else if group.IsArangosync() {
// Terminate pod
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, maskAny(err)
}
}
// Update status
m.State = api.MemberStateShuttingDown
if err := a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
return false, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, error) {
m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !found {
// Member not long exists
return true, nil
}
if m.Conditions.IsTrue(api.ConditionTypeTerminated) {
// Shutdown completed
return true, nil
}
// Member still not shutdown, retry soon
return false, nil
}

View file

@ -0,0 +1,127 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
)
// NewUpgradeMemberAction creates a new Action that implements the given
// planned UpgradeMember action.
func NewUpgradeMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionUpgradeMember{
log: log,
action: action,
actionCtx: actionCtx,
}
}
// actionUpgradeMember implements an UpgradeMember.
type actionUpgradeMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionUpgradeMember) Start(ctx context.Context) (bool, error) {
log := a.log
group := a.action.Group
m, ok := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !ok {
log.Error().Msg("No such member")
}
// Set AutoUpgrade condition
m.Conditions.Update(api.ConditionTypeAutoUpgrade, true, "Upgrading", "AutoUpgrade on first restart")
if err := a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
if group.IsArangod() {
// Invoke shutdown endpoint
c, err := a.actionCtx.GetServerClient(ctx, group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member client")
return false, maskAny(err)
}
removeFromCluster := false
log.Debug().Bool("removeFromCluster", removeFromCluster).Msg("Shutting down member")
ctx, cancel := context.WithTimeout(ctx, shutdownTimeout)
defer cancel()
if err := c.Shutdown(ctx, removeFromCluster); err != nil {
// Shutdown failed. Let's check if we're already done
if ready, err := a.CheckProgress(ctx); err == nil && ready {
// We're done
return true, nil
}
log.Debug().Err(err).Msg("Failed to shutdown member")
return false, maskAny(err)
}
} else if group.IsArangosync() {
// Terminate pod
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, maskAny(err)
}
}
// Update status
m.State = api.MemberStateRotating // We keep the rotation state here, since only when a new pod is created, it will get the Upgrading state.
if err := a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
return false, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, error) {
// Check that pod is removed
log := a.log
m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !found {
log.Error().Msg("No such member")
return true, nil
}
isUpgrading := m.State == api.MemberStateUpgrading
log = log.With().
Str("pod-name", m.PodName).
Bool("is-upgrading", isUpgrading).Logger()
if !m.Conditions.IsTrue(api.ConditionTypeTerminated) {
// Pod is not yet terminated
return false, nil
}
// Pod is terminated, we can now remove it
log.Debug().Msg("Deleting pod")
if err := a.actionCtx.DeletePod(m.PodName); err != nil {
return false, maskAny(err)
}
// Pod is now gone, update the member status
m.State = api.MemberStateNone
if err := a.actionCtx.UpdateMember(m); err != nil {
return false, maskAny(err)
}
return isUpgrading, nil
}

View file

@ -0,0 +1,217 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"sync"
"time"
driver "github.com/arangodb/go-driver"
"github.com/rs/zerolog"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)
// NewWaitForMemberUpAction creates a new Action that implements the given
// planned WaitForMemberUp action.
func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
return &actionWaitForMemberUp{
log: log,
action: action,
actionCtx: actionCtx,
}
}
const (
maxAgentResponseTime = time.Second * 10
)
// actionWaitForMemberUp implements an WaitForMemberUp.
type actionWaitForMemberUp struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionWaitForMemberUp) Start(ctx context.Context) (bool, error) {
ready, err := a.CheckProgress(ctx)
if err != nil {
return false, maskAny(err)
}
return ready, nil
}
// CheckProgress checks the progress of the action.
// Returns true if the action is completely finished, false otherwise.
func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, error) {
if a.action.Group.IsArangosync() {
return a.checkProgressArangoSync(ctx)
}
switch a.actionCtx.GetMode() {
case api.DeploymentModeSingle:
return a.checkProgressSingle(ctx)
default:
if a.action.Group == api.ServerGroupAgents {
return a.checkProgressAgent(ctx)
}
return a.checkProgressCluster(ctx)
}
}
// checkProgressSingle checks the progress of the action in the case
// of a single server.
func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client")
return false, maskAny(err)
}
if _, err := c.Version(ctx); err != nil {
log.Debug().Err(err).Msg("Failed to get version")
return false, maskAny(err)
}
return true, nil
}
type agentStatus struct {
IsLeader bool
LeaderEndpoint string
IsResponding bool
}
// checkProgressAgent checks the progress of the action in the case
// of an agent.
func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) {
log := a.log
clients, err := a.actionCtx.GetAgencyClients(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create agency clients")
return false, maskAny(err)
}
wg := sync.WaitGroup{}
invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"}
statuses := make([]agentStatus, len(clients))
for i, c := range clients {
wg.Add(1)
go func(i int, c arangod.Agency) {
defer wg.Done()
var trash interface{}
lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime)
defer cancel()
if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) {
// We got a valid read from the leader
statuses[i].IsLeader = true
statuses[i].LeaderEndpoint = c.Endpoint()
statuses[i].IsResponding = true
} else {
if location, ok := arangod.IsNotLeader(err); ok {
// Valid response from a follower
statuses[i].IsLeader = false
statuses[i].LeaderEndpoint = location
statuses[i].IsResponding = true
} else {
// Unexpected / invalid response
log.Debug().Err(err).Str("endpoint", c.Endpoint()).Msg("Agent is not responding")
statuses[i].IsResponding = false
}
}
}(i, c)
}
wg.Wait()
// Check the results
noLeaders := 0
for i, status := range statuses {
if !status.IsResponding {
log.Debug().Msg("Not all agents are responding")
return false, nil
}
if status.IsLeader {
noLeaders++
}
if i > 0 {
// Compare leader endpoint with previous
prev := statuses[i-1].LeaderEndpoint
if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) {
log.Debug().Msg("Not all agents report the same leader endpoint")
return false, nil
}
}
}
if noLeaders != 1 {
log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders")
return false, nil
}
log.Debug().
Int("leaders", noLeaders).
Int("followers", len(statuses)-noLeaders).
Msg("Agency is happy")
return true, nil
}
// checkProgressCluster checks the progress of the action in the case
// of a cluster deployment (coordinator/dbserver).
func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client")
return false, maskAny(err)
}
cluster, err := c.Cluster(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to access cluster")
return false, maskAny(err)
}
h, err := cluster.Health(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to get cluster health")
return false, maskAny(err)
}
sh, found := h.Health[driver.ServerID(a.action.MemberID)]
if !found {
log.Debug().Msg("Member not yet found in cluster health")
return false, nil
}
if sh.Status != driver.ServerStatusGood {
log.Debug().Str("status", string(sh.Status)).Msg("Member set status not yet good")
return false, nil
}
return true, nil
}
// checkProgressArangoSync checks the progress of the action in the case
// of a sync master / worker.
func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, error) {
// TODO
return true, nil
}

View file

@ -23,7 +23,6 @@
package deployment
import (
"context"
"fmt"
"reflect"
"time"
@ -89,8 +88,9 @@ type Deployment struct {
eventsCli corev1.EventInterface
inspectTrigger trigger.Trigger
clientCache *clientCache
inspectTrigger trigger.Trigger
recentInspectionErrors int
clientCache *clientCache
}
// New creates a new Deployment from the given API object.
@ -195,7 +195,6 @@ func (d *Deployment) run() {
}
inspectionInterval := maxInspectionInterval
recentInspectionErrors := 0
for {
select {
case <-d.stopCh:
@ -218,49 +217,7 @@ func (d *Deployment) run() {
}
case <-d.inspectTrigger.Done():
hasError := false
ctx := context.Background()
// Ensure we have image info
if retrySoon, err := d.ensureImages(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject))
} else if retrySoon {
inspectionInterval = minInspectionInterval
}
// Inspection of generated resources needed
if err := d.inspectPods(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
}
// Create scale/update plan
if err := d.createPlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
}
// Execute current step of scale/update plan
if retrySoon, err := d.executePlan(ctx); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
} else if retrySoon {
inspectionInterval = minInspectionInterval
}
// Ensure all resources are created
if err := d.ensurePVCs(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
}
if err := d.ensurePods(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
}
if hasError {
if recentInspectionErrors == 0 {
inspectionInterval = minInspectionInterval
recentInspectionErrors++
}
} else {
recentInspectionErrors = 0
}
inspectionInterval = d.inspectDeployment(inspectionInterval)
case <-time.After(inspectionInterval):
// Trigger inspection
@ -341,10 +298,13 @@ func (d *Deployment) createEvent(evt *v1.Event) {
}
// Update the status of the API object from the internal status
func (d *Deployment) updateCRStatus() error {
if reflect.DeepEqual(d.apiObject.Status, d.status) {
// Nothing has changed
return nil
func (d *Deployment) updateCRStatus(force ...bool) error {
// TODO Remove force....
if len(force) == 0 || !force[0] {
if reflect.DeepEqual(d.apiObject.Status, d.status) {
// Nothing has changed
return nil
}
}
// Send update to API server

View file

@ -0,0 +1,99 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// inspectDeployment inspects the entire deployment, creates
// a plan to update if needed and inspects underlying resources.
// This function should be called when:
// - the deployment has changed
// - any of the underlying resources has changed
// - once in a while
// Returns the delay until this function should be called again.
func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration {
// log := d.deps.Log
nextInterval := lastInterval
hasError := false
ctx := context.Background()
// Ensure we have image info
if retrySoon, err := d.ensureImages(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject))
} else if retrySoon {
nextInterval = minInspectionInterval
}
// Inspection of generated resources needed
if err := d.inspectPods(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
}
// Create scale/update plan
if err := d.createPlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
}
// Execute current step of scale/update plan
retrySoon, err := d.executePlan(ctx)
if err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
}
if retrySoon {
nextInterval = minInspectionInterval
}
// Ensure all resources are created
if err := d.ensurePVCs(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
}
if err := d.ensurePods(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
}
// Update next interval (on errors)
if hasError {
if d.recentInspectionErrors == 0 {
nextInterval = minInspectionInterval
d.recentInspectionErrors++
}
} else {
d.recentInspectionErrors = 0
}
if nextInterval > maxInspectionInterval {
nextInterval = maxInspectionInterval
}
return nextInterval
}

View file

@ -29,6 +29,7 @@ import (
"strings"
"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -39,6 +40,7 @@ import (
const (
dockerPullableImageIDPrefix = "docker-pullable://"
imageIDAndVersionRole = "id" // Role use by identification pods
)
type imagesBuilder struct {
@ -97,9 +99,9 @@ func (ib *imagesBuilder) Run(ctx context.Context) (bool, error) {
// When no pod exists, it is created, otherwise the ID is fetched & version detected.
// Returns: retrySoon, error
func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, image string) (bool, error) {
role := "id"
role := imageIDAndVersionRole
id := fmt.Sprintf("%0x", sha1.Sum([]byte(image)))[:6]
podName := k8sutil.CreatePodName(ib.APIObject.GetName(), role, id)
podName := k8sutil.CreatePodName(ib.APIObject.GetName(), role, id, "")
ns := ib.APIObject.GetNamespace()
log := ib.Log.With().
Str("pod", podName).
@ -166,10 +168,16 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, ima
"--server.authentication=false",
fmt.Sprintf("--server.endpoint=tcp://[::]:%d", k8sutil.ArangoPort),
}
if err := k8sutil.CreateArangodPod(ib.KubeCli, true, ib.APIObject, role, id, "", image, ib.Spec.GetImagePullPolicy(), args, nil, nil, nil, "", ""); err != nil {
if err := k8sutil.CreateArangodPod(ib.KubeCli, true, ib.APIObject, role, id, podName, "", image, ib.Spec.GetImagePullPolicy(), args, nil, nil, nil, "", ""); err != nil {
log.Debug().Err(err).Msg("Failed to create image ID pod")
return true, maskAny(err)
}
// Come back soon to inspect the pod
return true, nil
}
// isArangoDBImageIDAndVersionPod returns true if the given pod is used for fetching image ID and ArangoDB version of an image
func isArangoDBImageIDAndVersionPod(p v1.Pod) bool {
role, found := p.GetLabels()[k8sutil.LabelKeyRole]
return found && role == imageIDAndVersionRole
}

View file

@ -65,8 +65,9 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error
func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDeployment) error {
log := d.deps.Log
var id string
idPrefix := getArangodIDPrefix(group)
for {
id = strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well
id = idPrefix + strings.ToLower(uniuri.NewLen(8)) // K8s accepts only lowercase, so we use it here as well
if !d.status.Members.ContainsID(id) {
break
}
@ -82,7 +83,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
ID: id,
State: api.MemberStateNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: k8sutil.CreatePodName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
}
@ -92,7 +93,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
ID: id,
State: api.MemberStateNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: k8sutil.CreatePodName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
}
@ -102,7 +103,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
ID: id,
State: api.MemberStateNone,
PersistentVolumeClaimName: k8sutil.CreatePersistentVolumeClaimName(deploymentName, role, id),
PodName: k8sutil.CreatePodName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
}
@ -112,7 +113,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
ID: id,
State: api.MemberStateNone,
PersistentVolumeClaimName: "",
PodName: k8sutil.CreatePodName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
}
@ -122,7 +123,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
ID: id,
State: api.MemberStateNone,
PersistentVolumeClaimName: "",
PodName: k8sutil.CreatePodName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
}
@ -132,7 +133,7 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
ID: id,
State: api.MemberStateNone,
PersistentVolumeClaimName: "",
PodName: k8sutil.CreatePodName(deploymentName, role, id),
PodName: "",
}); err != nil {
return maskAny(err)
}
@ -142,3 +143,18 @@ func (d *Deployment) createMember(group api.ServerGroup, apiObject *api.ArangoDe
return nil
}
// getArangodIDPrefix returns the prefix required ID's of arangod servers
// in the given group.
func getArangodIDPrefix(group api.ServerGroup) string {
switch group {
case api.ServerGroupCoordinators:
return "CRDN-"
case api.ServerGroupDBServers:
return "PRMR-"
case api.ServerGroupAgents:
return "AGNT-"
default:
return ""
}
}

View file

@ -24,15 +24,39 @@ package deployment
import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// upgradeDecision is the result of an upgrade check.
type upgradeDecision struct {
UpgradeNeeded bool // If set, the image version has changed
UpgradeAllowed bool // If set, it is an allowed version change
AutoUpgradeNeeded bool // If set, the database must be started with `--database.auto-upgrade` once
}
// createPlan considers the current specification & status of the deployment creates a plan to
// get the status in line with the specification.
// If a plan already exists, nothing is done.
func (d *Deployment) createPlan() error {
// Get all current pods
pods, err := d.deps.KubeCli.CoreV1().Pods(d.apiObject.GetNamespace()).List(k8sutil.DeploymentListOpt(d.apiObject.GetName()))
if err != nil {
log.Debug().Err(err).Msg("Failed to list pods")
return maskAny(err)
}
myPods := make([]v1.Pod, 0, len(pods.Items))
for _, p := range pods.Items {
if d.isOwnerOf(&p) {
myPods = append(myPods, p)
}
}
// Create plan
newPlan, changed := createPlan(d.deps.Log, d.status.Plan, d.apiObject.Spec, d.status)
newPlan, changed := createPlan(d.deps.Log, d.apiObject, d.status.Plan, d.apiObject.Spec, d.status, myPods)
// If not change, we're done
if !changed {
@ -54,7 +78,9 @@ func (d *Deployment) createPlan() error {
// createPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createPlan(log zerolog.Logger, currentPlan api.Plan, spec api.DeploymentSpec, status api.DeploymentStatus) (api.Plan, bool) {
func createPlan(log zerolog.Logger, apiObject metav1.Object,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, pods []v1.Pod) (api.Plan, bool) {
if len(currentPlan) > 0 {
// Plan already exists, complete that first
return currentPlan, false
@ -78,10 +104,118 @@ func createPlan(log zerolog.Logger, currentPlan api.Plan, spec api.DeploymentSpe
plan = append(plan, createScalePlan(log, status.Members.SyncWorkers, api.ServerGroupSyncWorkers, spec.SyncWorkers.GetCount())...)
}
// Check for the need to rotate one or more members
getPod := func(podName string) *v1.Pod {
for _, p := range pods {
if p.GetName() == podName {
return &p
}
}
return nil
}
status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error {
for _, m := range *members {
if len(plan) > 0 {
// Only 1 change at a time
continue
}
if m.State != api.MemberStateCreated {
// Only rotate when state is created
continue
}
if podName := m.PodName; podName != "" {
if p := getPod(podName); p != nil {
// Got pod, compare it with what it should be
decision := podNeedsUpgrading(*p, spec, status.Images)
if decision.UpgradeNeeded && decision.UpgradeAllowed {
plan = append(plan, createUpgradeMemberPlan(log, m, group, "Version upgrade")...)
} else {
rotNeeded, reason := podNeedsRotation(*p, apiObject, spec, group, status.Members.Agents, m.ID)
if rotNeeded {
plan = append(plan, createRotateMemberPlan(log, m, group, reason)...)
}
}
}
}
}
return nil
})
// Return plan
return plan, true
}
// podNeedsUpgrading decides if an upgrade of the pod is needed (to comply with
// the given spec) and if that is allowed.
func podNeedsUpgrading(p v1.Pod, spec api.DeploymentSpec, images api.ImageInfoList) upgradeDecision {
if len(p.Spec.Containers) == 1 {
c := p.Spec.Containers[0]
specImageInfo, found := images.GetByImage(spec.GetImage())
if !found {
return upgradeDecision{UpgradeNeeded: false}
}
podImageInfo, found := images.GetByImageID(c.Image)
if !found {
return upgradeDecision{UpgradeNeeded: false}
}
if specImageInfo.ImageID == podImageInfo.ImageID {
// No change
return upgradeDecision{UpgradeNeeded: false}
}
// Image changed, check if change is allowed
specVersion := specImageInfo.ArangoDBVersion
podVersion := podImageInfo.ArangoDBVersion
if specVersion.Major() != podVersion.Major() {
// E.g. 3.x -> 4.x, we cannot allow automatically
return upgradeDecision{UpgradeNeeded: true, UpgradeAllowed: false}
}
if specVersion.Minor() != podVersion.Minor() {
// Is allowed, with `--database.auto-upgrade`
return upgradeDecision{
UpgradeNeeded: true,
UpgradeAllowed: true,
AutoUpgradeNeeded: true,
}
}
// Patch version change, rotate only
return upgradeDecision{
UpgradeNeeded: true,
UpgradeAllowed: true,
AutoUpgradeNeeded: false,
}
}
return upgradeDecision{UpgradeNeeded: false}
}
// podNeedsRotation returns true when the specification of the
// given pod differs from what it should be according to the
// given deployment spec.
// When true is returned, a reason for the rotation is already returned.
func podNeedsRotation(p v1.Pod, apiObject metav1.Object, spec api.DeploymentSpec,
group api.ServerGroup, agents api.MemberStatusList, id string) (bool, string) {
// Check number of containers
if len(p.Spec.Containers) != 1 {
return true, "Number of containers changed"
}
// Check image pull policy
c := p.Spec.Containers[0]
if c.ImagePullPolicy != spec.GetImagePullPolicy() {
return true, "Image pull policy changed"
}
// Check arguments
/*expectedArgs := createArangodArgs(apiObject, spec, group, agents, id)
if len(expectedArgs) != len(c.Args) {
return true, "Arguments changed"
}
for i, a := range expectedArgs {
if c.Args[i] != a {
return true, "Arguments changed"
}
}*/
return false, ""
}
// createScalePlan creates a scaling plan for a single server group
func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api.ServerGroup, count int) api.Plan {
var plan api.Plan
@ -89,7 +223,7 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api
// Scale up
toAdd := count - len(members)
for i := 0; i < toAdd; i++ {
plan = append(plan, api.Action{Type: api.ActionTypeAddMember, Group: group})
plan = append(plan, api.NewAction(api.ActionTypeAddMember, group, ""))
}
log.Debug().
Int("delta", toAdd).
@ -100,12 +234,12 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api
if m, err := members.SelectMemberToRemove(); err == nil {
if group == api.ServerGroupDBServers {
plan = append(plan,
api.Action{Type: api.ActionTypeCleanOutMember, Group: group, MemberID: m.ID},
api.NewAction(api.ActionTypeCleanOutMember, group, m.ID),
)
}
plan = append(plan,
api.Action{Type: api.ActionTypeShutdownMember, Group: group, MemberID: m.ID},
api.Action{Type: api.ActionTypeRemoveMember, Group: group, MemberID: m.ID},
api.NewAction(api.ActionTypeShutdownMember, group, m.ID),
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
)
log.Debug().
Str("role", group.AsRole()).
@ -114,3 +248,33 @@ func createScalePlan(log zerolog.Logger, members api.MemberStatusList, group api
}
return plan
}
// createRotateMemberPlan creates a plan to rotate (stop-recreate-start) an existing
// member.
func createRotateMemberPlan(log zerolog.Logger, member api.MemberStatus,
group api.ServerGroup, reason string) api.Plan {
log.Debug().
Str("id", member.ID).
Str("role", group.AsRole()).
Msg("Creating rotation plan")
plan := api.Plan{
api.NewAction(api.ActionTypeRotateMember, group, member.ID, reason),
api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID),
}
return plan
}
// createUpgradeMemberPlan creates a plan to upgrade (stop-recreateWithAutoUpgrade-stop-start) an existing
// member.
func createUpgradeMemberPlan(log zerolog.Logger, member api.MemberStatus,
group api.ServerGroup, reason string) api.Plan {
log.Debug().
Str("id", member.ID).
Str("role", group.AsRole()).
Msg("Creating upgrade plan")
plan := api.Plan{
api.NewAction(api.ActionTypeUpgradeMember, group, member.ID, reason),
api.NewAction(api.ActionTypeWaitForMemberUp, group, member.ID),
}
return plan
}

View file

@ -28,6 +28,7 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util"
@ -40,10 +41,17 @@ func TestCreatePlanSingleScale(t *testing.T) {
XMode: api.NewMode(api.DeploymentModeSingle),
}
spec.SetDefaults("test")
depl := &api.ArangoDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test_depl",
Namespace: "test",
},
Spec: spec,
}
// Test with empty status
var status api.DeploymentStatus
newPlan, changed := createPlan(log, nil, spec, status)
newPlan, changed := createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -54,7 +62,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createPlan(log, nil, spec, status)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -69,7 +77,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something1",
},
}
newPlan, changed = createPlan(log, nil, spec, status)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
}
@ -82,10 +90,17 @@ func TestCreatePlanResilientSingleScale(t *testing.T) {
}
spec.SetDefaults("test")
spec.Single.XCount = util.NewInt(2)
depl := &api.ArangoDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test_depl",
Namespace: "test",
},
Spec: spec,
}
// Test with empty status
var status api.DeploymentStatus
newPlan, changed := createPlan(log, nil, spec, status)
newPlan, changed := createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
require.Len(t, newPlan, 2)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -98,7 +113,7 @@ func TestCreatePlanResilientSingleScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createPlan(log, nil, spec, status)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
require.Len(t, newPlan, 1)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -123,7 +138,7 @@ func TestCreatePlanResilientSingleScale(t *testing.T) {
PodName: "something4",
},
}
newPlan, changed = createPlan(log, nil, spec, status)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
require.Len(t, newPlan, 2) // Note: Downscaling is only down 1 at a time
assert.Equal(t, api.ActionTypeShutdownMember, newPlan[0].Type)
@ -139,10 +154,17 @@ func TestCreatePlanClusterScale(t *testing.T) {
XMode: api.NewMode(api.DeploymentModeCluster),
}
spec.SetDefaults("test")
depl := &api.ArangoDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "test_depl",
Namespace: "test",
},
Spec: spec,
}
// Test with empty status
var status api.DeploymentStatus
newPlan, changed := createPlan(log, nil, spec, status)
newPlan, changed := createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
require.Len(t, newPlan, 6) // Adding 3 dbservers & 3 coordinators (note: agents do not scale now)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -175,7 +197,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
PodName: "coordinator1",
},
}
newPlan, changed = createPlan(log, nil, spec, status)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil)
assert.True(t, changed)
require.Len(t, newPlan, 3)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -212,7 +234,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
}
spec.DBServers.XCount = util.NewInt(1)
spec.Coordinators.XCount = util.NewInt(1)
newPlan, changed = createPlan(log, nil, spec, status)
newPlan, changed = createPlan(log, nil, spec, status, nil)
assert.True(t, changed)
require.Len(t, newPlan, 5) // Note: Downscaling is done 1 at a time
assert.Equal(t, api.ActionTypeCleanOutMember, newPlan[0].Type)

View file

@ -29,7 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog"
)
// executePlan tries to execute the plan as far as possible.
@ -37,21 +37,29 @@ import (
// False otherwise.
func (d *Deployment) executePlan(ctx context.Context) (bool, error) {
log := d.deps.Log
initialPlanLen := len(d.status.Plan)
for {
if len(d.status.Plan) == 0 {
// No plan exists, nothing to be done
return false, nil
return initialPlanLen > 0, nil
}
// Take first action
action := d.status.Plan[0]
if action.StartTime.IsZero() {
planAction := d.status.Plan[0]
log := log.With().
Int("plan-len", len(d.status.Plan)).
Str("action-id", planAction.ID).
Str("action-type", string(planAction.Type)).
Str("group", planAction.Group.AsRole()).
Str("member-id", planAction.MemberID).
Logger()
action := d.createAction(ctx, log, planAction)
if planAction.StartTime.IsZero() {
// Not started yet
ready, err := d.startAction(ctx, action)
ready, err := action.Start(ctx)
if err != nil {
log.Debug().Err(err).
Str("action-type", string(action.Type)).
Msg("Failed to start action")
return false, maskAny(err)
}
@ -64,9 +72,11 @@ func (d *Deployment) executePlan(ctx context.Context) (bool, error) {
d.status.Plan[0].StartTime = &now
}
// Save plan update
if err := d.updateCRStatus(); err != nil {
if err := d.updateCRStatus(true); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")
return false, maskAny(err)
}
log.Debug().Bool("ready", ready).Msg("Action Start completed")
if !ready {
// We need to check back soon
return true, nil
@ -74,23 +84,25 @@ func (d *Deployment) executePlan(ctx context.Context) (bool, error) {
// Continue with next action
} else {
// First action of plan has been started, check its progress
ready, err := d.checkActionProgress(ctx, action)
ready, err := action.CheckProgress(ctx)
if err != nil {
log.Debug().Err(err).
Str("action-type", string(action.Type)).
Msg("Failed to check action progress")
log.Debug().Err(err).Msg("Failed to check action progress")
return false, maskAny(err)
}
if ready {
// Remove action from list
d.status.Plan = d.status.Plan[1:]
// Save plan update
if err := d.updateCRStatus(); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")
return false, maskAny(err)
}
}
log.Debug().Bool("ready", ready).Msg("Action CheckProgress completed")
if !ready {
// Not ready check, come back soon
return true, nil
}
// Remove action from list
d.status.Plan = d.status.Plan[1:]
// Save plan update
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
// Continue with next action
}
}
@ -99,152 +111,24 @@ func (d *Deployment) executePlan(ctx context.Context) (bool, error) {
// startAction performs the start of the given action
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (d *Deployment) startAction(ctx context.Context, action api.Action) (bool, error) {
log := d.deps.Log
ns := d.apiObject.GetNamespace()
func (d *Deployment) createAction(ctx context.Context, log zerolog.Logger, action api.Action) Action {
actionCtx := NewActionContext(log, d)
switch action.Type {
case api.ActionTypeAddMember:
if err := d.createMember(action.Group, d.apiObject); err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member")
return false, maskAny(err)
}
// Save added member
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
return true, nil
return NewAddMemberAction(log, action, actionCtx)
case api.ActionTypeRemoveMember:
m, _, ok := d.status.Members.ElementByID(action.MemberID)
if !ok {
// We wanted to remove and it is already gone. All ok
return true, nil
}
// Remove the pod (if any)
if err := d.deps.KubeCli.Core().Pods(ns).Delete(m.PodName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pod", m.PodName).Msg("Failed to remove pod")
return false, maskAny(err)
}
// Remove the pvc (if any)
if m.PersistentVolumeClaimName != "" {
if err := d.deps.KubeCli.Core().PersistentVolumeClaims(ns).Delete(m.PersistentVolumeClaimName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pod", m.PodName).Msg("Failed to remove pvc")
return false, maskAny(err)
}
}
// Remove member
if err := d.status.Members.RemoveByID(action.MemberID, action.Group); err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to remove member")
return false, maskAny(err)
}
// Save removed member
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
return true, nil
return NewRemoveMemberAction(log, action, actionCtx)
case api.ActionTypeCleanOutMember:
m, ok := d.status.Members.DBServers.ElementByID(action.MemberID)
if !ok {
log.Error().Str("group", action.Group.AsRole()).Str("id", action.MemberID).Msg("No such member")
return true, nil
}
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member client")
return false, maskAny(err)
}
cluster, err := c.Cluster(ctx)
if err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to access cluster")
return false, maskAny(err)
}
if err := cluster.CleanOutServer(ctx, action.MemberID); err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to cleanout member")
return false, maskAny(err)
}
// Update status
m.State = api.MemberStateCleanOut
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
return true, nil
return NewCleanOutMemberAction(log, action, actionCtx)
case api.ActionTypeShutdownMember:
m, _, ok := d.status.Members.ElementByID(action.MemberID)
if !ok {
log.Error().Str("group", action.Group.AsRole()).Str("id", action.MemberID).Msg("No such member")
return true, nil
}
if action.Group.IsArangod() {
// Invoke shutdown endpoint
c, err := d.clientCache.Get(ctx, action.Group, action.MemberID)
if err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member client")
return false, maskAny(err)
}
if err := c.Shutdown(ctx, true); err != nil {
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to shutdown member")
return false, maskAny(err)
}
} else if action.Group.IsArangosync() {
// Terminate pod
if err := d.deps.KubeCli.Core().Pods(ns).Delete(m.PodName, &metav1.DeleteOptions{}); err != nil && !k8sutil.IsNotFound(err) {
log.Debug().Err(err).Str("pod", m.PodName).Msg("Failed to remove pod")
return false, maskAny(err)
}
}
// Update status
m.State = api.MemberStateShuttingDown
if err := d.updateCRStatus(); err != nil {
return false, maskAny(err)
}
return true, nil
return NewShutdownMemberAction(log, action, actionCtx)
case api.ActionTypeRotateMember:
return NewRotateMemberAction(log, action, actionCtx)
case api.ActionTypeUpgradeMember:
return NewUpgradeMemberAction(log, action, actionCtx)
case api.ActionTypeWaitForMemberUp:
return NewWaitForMemberUpAction(log, action, actionCtx)
default:
return false, maskAny(fmt.Errorf("Unknown action type"))
}
}
// checkActionProgress checks the progress of the given action.
// Returns true if the action is completely finished, false otherwise.
func (d *Deployment) checkActionProgress(ctx context.Context, action api.Action) (bool, error) {
switch action.Type {
case api.ActionTypeAddMember:
// Nothing todo
return true, nil
case api.ActionTypeRemoveMember:
// Nothing todo
return true, nil
case api.ActionTypeCleanOutMember:
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
return false, maskAny(err)
}
cluster, err := c.Cluster(ctx)
if err != nil {
return false, maskAny(err)
}
cleanedOut, err := cluster.IsCleanedOut(ctx, action.MemberID)
if err != nil {
return false, maskAny(err)
}
if !cleanedOut {
// We're not done yet
return false, nil
}
// Cleanout completed
return true, nil
case api.ActionTypeShutdownMember:
m, _, ok := d.status.Members.ElementByID(action.MemberID)
if !ok {
// Member not long exists
return true, nil
}
if m.Conditions.IsTrue(api.ConditionTypeTerminated) {
// Shutdown completed
return true, nil
}
// Member still not shutdown, retry soon
return false, nil
default:
return false, maskAny(fmt.Errorf("Unknown action type"))
panic(fmt.Sprintf("Unknown action type '%s'", action.Type))
}
}

View file

@ -23,6 +23,8 @@
package deployment
import (
"crypto/sha1"
"encoding/json"
"fmt"
"net"
"path/filepath"
@ -55,8 +57,10 @@ func (o optionPair) CompareTo(other optionPair) int {
}
// createArangodArgs creates command line arguments for an arangod server in the given group.
func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup, svrSpec api.ServerGroupSpec, agents api.MemberStatusList, id string) []string {
func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, group api.ServerGroup,
agents api.MemberStatusList, id string, autoUpgrade bool) []string {
options := make([]optionPair, 0, 64)
svrSpec := deplSpec.GetServerGroupSpec(group)
// Endpoint
listenAddr := "[::]"
@ -123,6 +127,14 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro
optionPair{"--database.directory", k8sutil.ArangodVolumeMountDir},
optionPair{"--log.output", "+"},
)
// Auto upgrade?
if autoUpgrade {
options = append(options,
optionPair{"--database.auto-upgrade", "true"},
)
}
/* if config.ServerThreads != 0 {
options = append(options,
optionPair{"--server.threads", strconv.Itoa(config.ServerThreads)})
@ -136,7 +148,7 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro
switch group {
case api.ServerGroupAgents:
options = append(options,
optionPair{"--cluster.my-id", id},
optionPair{"--agency.disaster-recovery-id", id},
optionPair{"--agency.activate", "true"},
optionPair{"--agency.my-address", myTCPURL},
optionPair{"--agency.size", strconv.Itoa(deplSpec.Agents.GetCount())},
@ -152,15 +164,9 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro
)
}
}
/*if agentRecoveryID != "" {
options = append(options,
optionPair{"--agency.disaster-recovery-id", agentRecoveryID},
)
}*/
case api.ServerGroupDBServers:
addAgentEndpoints = true
options = append(options,
optionPair{"--cluster.my-id", id},
optionPair{"--cluster.my-address", myTCPURL},
optionPair{"--cluster.my-role", "PRIMARY"},
optionPair{"--foxx.queues", "false"},
@ -169,7 +175,6 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro
case api.ServerGroupCoordinators:
addAgentEndpoints = true
options = append(options,
optionPair{"--cluster.my-id", id},
optionPair{"--cluster.my-address", myTCPURL},
optionPair{"--cluster.my-role", "COORDINATOR"},
optionPair{"--foxx.queues", "true"},
@ -184,7 +189,6 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro
addAgentEndpoints = true
options = append(options,
optionPair{"--replication.automatic-failover", "true"},
optionPair{"--cluster.my-id", id},
optionPair{"--cluster.my-address", myTCPURL},
optionPair{"--cluster.my-role", "SINGLE"},
)
@ -310,8 +314,13 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error {
if m.State != api.MemberStateNone {
continue
}
// Create pod
// Update pod name
role := group.AsRole()
roleAbbr := group.AsRoleAbbreviated()
podSuffix := createPodSuffix(apiObject.Spec)
m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, podSuffix)
newState := api.MemberStateCreated
// Create pod
if group.IsArangod() {
// Find image ID
info, found := apiObject.Status.Images.GetByImage(apiObject.Spec.GetImage())
@ -320,7 +329,11 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error {
return nil
}
// Prepare arguments
args := createArangodArgs(apiObject, apiObject.Spec, group, spec, d.status.Members.Agents, m.ID)
autoUpgrade := m.Conditions.IsTrue(api.ConditionTypeAutoUpgrade)
if autoUpgrade {
newState = api.MemberStateUpgrading
}
args := createArangodArgs(apiObject, apiObject.Spec, group, d.status.Members.Agents, m.ID, autoUpgrade)
env := make(map[string]k8sutil.EnvValue)
livenessProbe, err := d.createLivenessProbe(apiObject, group)
if err != nil {
@ -355,7 +368,7 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error {
SecretKey: constants.SecretKeyJWT,
}
}
if err := k8sutil.CreateArangodPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PersistentVolumeClaimName, info.ImageID, apiObject.Spec.GetImagePullPolicy(), args, env, livenessProbe, readinessProbe, tlsKeyfileSecretName, rocksdbEncryptionSecretName); err != nil {
if err := k8sutil.CreateArangodPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, m.PersistentVolumeClaimName, info.ImageID, apiObject.Spec.GetImagePullPolicy(), args, env, livenessProbe, readinessProbe, tlsKeyfileSecretName, rocksdbEncryptionSecretName); err != nil {
return maskAny(err)
}
} else if group.IsArangosync() {
@ -376,12 +389,15 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error {
if group == api.ServerGroupSyncWorkers {
affinityWithRole = api.ServerGroupDBServers.AsRole()
}
if err := k8sutil.CreateArangoSyncPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, info.ImageID, apiObject.Spec.Sync.GetImagePullPolicy(), args, env, livenessProbe, affinityWithRole); err != nil {
if err := k8sutil.CreateArangoSyncPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, info.ImageID, apiObject.Spec.Sync.GetImagePullPolicy(), args, env, livenessProbe, affinityWithRole); err != nil {
return maskAny(err)
}
}
// Record new member state
m.State = api.MemberStateCreated
m.State = newState
m.Conditions.Remove(api.ConditionTypeReady)
m.Conditions.Remove(api.ConditionTypeTerminated)
m.Conditions.Remove(api.ConditionTypeAutoUpgrade)
if err := status.Update(m); err != nil {
return maskAny(err)
}
@ -397,3 +413,9 @@ func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error {
}
return nil
}
func createPodSuffix(spec api.DeploymentSpec) string {
raw, _ := json.Marshal(spec)
hash := sha1.Sum(raw)
return fmt.Sprintf("%0x", hash)[:6]
}

View file

@ -51,16 +51,60 @@ func TestCreateArangodArgsAgent(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false)
assert.Equal(t,
[]string{
"--agency.activate=true",
"--agency.disaster-recovery-id=a1",
"--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529",
"--agency.size=3",
"--agency.supervision=true",
"--cluster.my-id=a1",
"--database.directory=/data",
"--foxx.queues=false",
"--log.level=INFO",
"--log.output=+",
"--server.authentication=true",
"--server.endpoint=ssl://[::]:8529",
"--server.jwt-secret=$(ARANGOD_JWT_SECRET)",
"--server.statistics=false",
"--server.storage-engine=rocksdb",
"--ssl.ecdh-curve=",
"--ssl.keyfile=/secrets/tls/tls.keyfile",
},
cmdline,
)
}
// Default+AutoUpgrade deployment
{
apiObject := &api.ArangoDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "ns",
},
Spec: api.DeploymentSpec{
Mode: api.DeploymentModeCluster,
},
}
apiObject.Spec.SetDefaults("test")
agents := api.MemberStatusList{
api.MemberStatus{ID: "a1"},
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", true)
assert.Equal(t,
[]string{
"--agency.activate=true",
"--agency.disaster-recovery-id=a1",
"--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529",
"--agency.size=3",
"--agency.supervision=true",
"--database.auto-upgrade=true",
"--database.directory=/data",
"--foxx.queues=false",
"--log.level=INFO",
@ -97,16 +141,16 @@ func TestCreateArangodArgsAgent(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false)
assert.Equal(t,
[]string{
"--agency.activate=true",
"--agency.disaster-recovery-id=a1",
"--agency.endpoint=tcp://name-agent-a2.name-int.ns.svc:8529",
"--agency.endpoint=tcp://name-agent-a3.name-int.ns.svc:8529",
"--agency.my-address=tcp://name-agent-a1.name-int.ns.svc:8529",
"--agency.size=3",
"--agency.supervision=true",
"--cluster.my-id=a1",
"--database.directory=/data",
"--foxx.queues=false",
"--log.level=INFO",
@ -140,16 +184,16 @@ func TestCreateArangodArgsAgent(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false)
assert.Equal(t,
[]string{
"--agency.activate=true",
"--agency.disaster-recovery-id=a1",
"--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529",
"--agency.size=3",
"--agency.supervision=true",
"--cluster.my-id=a1",
"--database.directory=/data",
"--foxx.queues=false",
"--log.level=INFO",
@ -183,16 +227,16 @@ func TestCreateArangodArgsAgent(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, apiObject.Spec.Agents, agents, "a1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupAgents, agents, "a1", false)
assert.Equal(t,
[]string{
"--agency.activate=true",
"--agency.disaster-recovery-id=a1",
"--agency.endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--agency.endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--agency.my-address=ssl://name-agent-a1.name-int.ns.svc:8529",
"--agency.size=3",
"--agency.supervision=true",
"--cluster.my-id=a1",
"--database.directory=/data",
"--foxx.queues=false",
"--log.level=INFO",

View file

@ -51,14 +51,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=COORDINATOR",
"--database.directory=/data",
"--foxx.queues=true",
@ -76,6 +75,48 @@ func TestCreateArangodArgsCoordinator(t *testing.T) {
)
}
// Default+AutoUpgrade deployment
{
apiObject := &api.ArangoDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "ns",
},
Spec: api.DeploymentSpec{
Mode: api.DeploymentModeCluster,
},
}
apiObject.Spec.SetDefaults("test")
agents := api.MemberStatusList{
api.MemberStatus{ID: "a1"},
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", true)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529",
"--cluster.my-role=COORDINATOR",
"--database.auto-upgrade=true",
"--database.directory=/data",
"--foxx.queues=true",
"--log.level=INFO",
"--log.output=+",
"--server.authentication=true",
"--server.endpoint=ssl://[::]:8529",
"--server.jwt-secret=$(ARANGOD_JWT_SECRET)",
"--server.statistics=true",
"--server.storage-engine=rocksdb",
"--ssl.ecdh-curve=",
"--ssl.keyfile=/secrets/tls/tls.keyfile",
},
cmdline,
)
}
// Default+TLS disabled deployment
{
apiObject := &api.ArangoDeployment{
@ -96,14 +137,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=tcp://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=tcp://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=tcp://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=tcp://name-coordinator-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=COORDINATOR",
"--database.directory=/data",
"--foxx.queues=true",
@ -137,14 +177,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=COORDINATOR",
"--database.directory=/data",
"--foxx.queues=true",
@ -180,14 +219,13 @@ func TestCreateArangodArgsCoordinator(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, apiObject.Spec.Coordinators, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupCoordinators, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-coordinator-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=COORDINATOR",
"--database.directory=/data",
"--foxx.queues=true",

View file

@ -51,14 +51,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=PRIMARY",
"--database.directory=/data",
"--foxx.queues=false",
@ -76,6 +75,48 @@ func TestCreateArangodArgsDBServer(t *testing.T) {
)
}
// Default+AutoUpgrade deployment
{
apiObject := &api.ArangoDeployment{
ObjectMeta: metav1.ObjectMeta{
Name: "name",
Namespace: "ns",
},
Spec: api.DeploymentSpec{
Mode: api.DeploymentModeCluster,
},
}
apiObject.Spec.SetDefaults("test")
agents := api.MemberStatusList{
api.MemberStatus{ID: "a1"},
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", true)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529",
"--cluster.my-role=PRIMARY",
"--database.auto-upgrade=true",
"--database.directory=/data",
"--foxx.queues=false",
"--log.level=INFO",
"--log.output=+",
"--server.authentication=true",
"--server.endpoint=ssl://[::]:8529",
"--server.jwt-secret=$(ARANGOD_JWT_SECRET)",
"--server.statistics=true",
"--server.storage-engine=rocksdb",
"--ssl.ecdh-curve=",
"--ssl.keyfile=/secrets/tls/tls.keyfile",
},
cmdline,
)
}
// Default+TLS disabled deployment
{
apiObject := &api.ArangoDeployment{
@ -96,14 +137,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=tcp://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=tcp://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=tcp://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=tcp://name-dbserver-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=PRIMARY",
"--database.directory=/data",
"--foxx.queues=false",
@ -137,14 +177,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=PRIMARY",
"--database.directory=/data",
"--foxx.queues=false",
@ -180,14 +219,13 @@ func TestCreateArangodArgsDBServer(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, apiObject.Spec.DBServers, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupDBServers, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-dbserver-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=PRIMARY",
"--database.directory=/data",
"--foxx.queues=false",

View file

@ -42,7 +42,7 @@ func TestCreateArangodArgsSingle(t *testing.T) {
},
}
apiObject.Spec.SetDefaults("test")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false)
assert.Equal(t,
[]string{
"--database.directory=/data",
@ -61,6 +61,34 @@ func TestCreateArangodArgsSingle(t *testing.T) {
)
}
// Default+AutoUpgrade deployment
{
apiObject := &api.ArangoDeployment{
Spec: api.DeploymentSpec{
Mode: api.DeploymentModeSingle,
},
}
apiObject.Spec.SetDefaults("test")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", true)
assert.Equal(t,
[]string{
"--database.auto-upgrade=true",
"--database.directory=/data",
"--foxx.queues=true",
"--log.level=INFO",
"--log.output=+",
"--server.authentication=true",
"--server.endpoint=ssl://[::]:8529",
"--server.jwt-secret=$(ARANGOD_JWT_SECRET)",
"--server.statistics=true",
"--server.storage-engine=rocksdb",
"--ssl.ecdh-curve=",
"--ssl.keyfile=/secrets/tls/tls.keyfile",
},
cmdline,
)
}
// Default+TLS disabled deployment
{
apiObject := &api.ArangoDeployment{
@ -72,7 +100,7 @@ func TestCreateArangodArgsSingle(t *testing.T) {
},
}
apiObject.Spec.SetDefaults("test")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false)
assert.Equal(t,
[]string{
"--database.directory=/data",
@ -98,7 +126,7 @@ func TestCreateArangodArgsSingle(t *testing.T) {
},
}
apiObject.Spec.SetDefaults("test")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false)
assert.Equal(t,
[]string{
"--database.directory=/data",
@ -126,7 +154,7 @@ func TestCreateArangodArgsSingle(t *testing.T) {
}
apiObject.Spec.Authentication.XJWTSecretName = util.NewString("None")
apiObject.Spec.SetDefaults("test")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false)
assert.Equal(t,
[]string{
"--database.directory=/data",
@ -153,7 +181,7 @@ func TestCreateArangodArgsSingle(t *testing.T) {
}
apiObject.Spec.Single.Args = []string{"--foo1", "--foo2"}
apiObject.Spec.SetDefaults("test")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, nil, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, nil, "id1", false)
assert.Equal(t,
[]string{
"--database.directory=/data",
@ -191,14 +219,13 @@ func TestCreateArangodArgsSingle(t *testing.T) {
api.MemberStatus{ID: "a2"},
api.MemberStatus{ID: "a3"},
}
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, apiObject.Spec.Single, agents, "id1")
cmdline := createArangodArgs(apiObject, apiObject.Spec, api.ServerGroupSingle, agents, "id1", false)
assert.Equal(t,
[]string{
"--cluster.agency-endpoint=ssl://name-agent-a1.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a2.name-int.ns.svc:8529",
"--cluster.agency-endpoint=ssl://name-agent-a3.name-int.ns.svc:8529",
"--cluster.my-address=ssl://name-single-id1.name-int.ns.svc:8529",
"--cluster.my-id=id1",
"--cluster.my-role=SINGLE",
"--database.directory=/data",
"--foxx.queues=true",

View file

@ -53,6 +53,10 @@ func (d *Deployment) inspectPods() error {
log.Debug().Str("pod", p.GetName()).Msg("pod not owned by this deployment")
continue
}
if isArangoDBImageIDAndVersionPod(p) {
// Image ID pods are not relevant to inspect here
continue
}
// Pod belongs to this deployment, update metric
inspectedPodCounter.Inc()
@ -113,7 +117,7 @@ func (d *Deployment) inspectPods() error {
switch m.State {
case api.MemberStateNone:
// Do nothing
case api.MemberStateShuttingDown:
case api.MemberStateShuttingDown, api.MemberStateRotating, api.MemberStateUpgrading:
// Shutdown was intended, so not need to do anything here.
// Just mark terminated
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") {

View file

@ -35,7 +35,8 @@ var (
// The defaultLevels list is used during development to increase the
// default level for components that we care a little less about.
defaultLevels = map[string]string{
//"something.status": "info",
"operator": "info",
//"something.status": "info",
}
)

View file

@ -31,7 +31,7 @@ import (
// waitForCRD waits for the CustomResourceDefinition (created externally)
// to be ready.
func (o *Operator) waitForCRD(enableDeployment, enableStorage bool) error {
log := o.Dependencies.Log
log := o.log
if enableDeployment {
log.Debug().Msg("Waiting for ArangoDeployment CRD to be ready")

View file

@ -37,6 +37,7 @@ import (
lsapi "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/storage"
)
@ -54,6 +55,7 @@ type Operator struct {
Config
Dependencies
log zerolog.Logger
deployments map[string]*deployment.Deployment
localStorages map[string]*storage.LocalStorage
}
@ -69,7 +71,7 @@ type Config struct {
}
type Dependencies struct {
Log zerolog.Logger
LogService logging.Service
KubeCli kubernetes.Interface
KubeExtCli apiextensionsclient.Interface
CRCli versioned.Interface
@ -81,6 +83,7 @@ func NewOperator(config Config, deps Dependencies) (*Operator, error) {
o := &Operator{
Config: config,
Dependencies: deps,
log: deps.LogService.MustGetLogger("operator"),
deployments: make(map[string]*deployment.Deployment),
localStorages: make(map[string]*storage.LocalStorage),
}

View file

@ -63,9 +63,8 @@ func (o *Operator) runDeployments(stop <-chan struct{}) {
// onAddArangoDeployment deployment addition callback
func (o *Operator) onAddArangoDeployment(obj interface{}) {
log := o.Dependencies.Log
apiObject := obj.(*api.ArangoDeployment)
log.Debug().
o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoDeployment added")
o.syncArangoDeployment(apiObject)
@ -73,9 +72,8 @@ func (o *Operator) onAddArangoDeployment(obj interface{}) {
// onUpdateArangoDeployment deployment update callback
func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) {
log := o.Dependencies.Log
apiObject := newObj.(*api.ArangoDeployment)
log.Debug().
o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoDeployment updated")
o.syncArangoDeployment(apiObject)
@ -83,7 +81,7 @@ func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) {
// onDeleteArangoDeployment deployment delete callback
func (o *Operator) onDeleteArangoDeployment(obj interface{}) {
log := o.Dependencies.Log
log := o.log
apiObject, ok := obj.(*api.ArangoDeployment)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -129,7 +127,7 @@ func (o *Operator) syncArangoDeployment(apiObject *api.ArangoDeployment) {
//pt.start()
err := o.handleDeploymentEvent(ev)
if err != nil {
o.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event")
o.log.Warn().Err(err).Msg("Failed to handle event")
}
//pt.stop()
}
@ -197,8 +195,7 @@ func (o *Operator) makeDeploymentConfigAndDeps(apiObject *api.ArangoDeployment)
ServiceAccount: o.Config.ServiceAccount,
}
deps := deployment.Dependencies{
Log: o.Dependencies.Log.With().
Str("component", "deployment").
Log: o.Dependencies.LogService.MustGetLogger("deployment").With().
Str("deployment", apiObject.GetName()).
Logger(),
KubeCli: o.Dependencies.KubeCli,

View file

@ -32,7 +32,7 @@ import (
func (o *Operator) runLeaderElection(lockName string, onStart func(stop <-chan struct{})) {
namespace := o.Config.Namespace
kubecli := o.Dependencies.KubeCli
log := o.Dependencies.Log.With().Str("lock-name", lockName).Logger()
log := o.log.With().Str("lock-name", lockName).Logger()
rl, err := resourcelock.New(resourcelock.EndpointsResourceLock,
namespace,
lockName,

View file

@ -63,9 +63,8 @@ func (o *Operator) runLocalStorages(stop <-chan struct{}) {
// onAddArangoLocalStorage local storage addition callback
func (o *Operator) onAddArangoLocalStorage(obj interface{}) {
log := o.Dependencies.Log
apiObject := obj.(*api.ArangoLocalStorage)
log.Debug().
o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoLocalStorage added")
o.syncArangoLocalStorage(apiObject)
@ -73,9 +72,8 @@ func (o *Operator) onAddArangoLocalStorage(obj interface{}) {
// onUpdateArangoLocalStorage local storage update callback
func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) {
log := o.Dependencies.Log
apiObject := newObj.(*api.ArangoLocalStorage)
log.Debug().
o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoLocalStorage updated")
o.syncArangoLocalStorage(apiObject)
@ -83,7 +81,7 @@ func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) {
// onDeleteArangoLocalStorage local storage delete callback
func (o *Operator) onDeleteArangoLocalStorage(obj interface{}) {
log := o.Dependencies.Log
log := o.log
apiObject, ok := obj.(*api.ArangoLocalStorage)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
@ -129,7 +127,7 @@ func (o *Operator) syncArangoLocalStorage(apiObject *api.ArangoLocalStorage) {
//pt.start()
err := o.handleLocalStorageEvent(ev)
if err != nil {
o.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event")
o.log.Warn().Err(err).Msg("Failed to handle event")
}
//pt.stop()
}
@ -199,8 +197,7 @@ func (o *Operator) makeLocalStorageConfigAndDeps(apiObject *api.ArangoLocalStora
ServiceAccount: o.Config.ServiceAccount,
}
deps := storage.Dependencies{
Log: o.Dependencies.Log.With().
Str("component", "storage").
Log: o.Dependencies.LogService.MustGetLogger("storage").With().
Str("localStorage", apiObject.GetName()).
Logger(),
KubeCli: o.Dependencies.KubeCli,

147
pkg/util/arangod/agency.go Normal file
View file

@ -0,0 +1,147 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package arangod
import (
"context"
"encoding/json"
"fmt"
"strings"
driver "github.com/arangodb/go-driver"
"github.com/pkg/errors"
)
// Agency provides API implemented by the ArangoDB agency.
type Agency interface {
// ReadKey reads the value of a given key in the agency.
ReadKey(ctx context.Context, key []string, value interface{}) error
// Endpoint returns the endpoint of this agent connection
Endpoint() string
}
// NewAgencyClient creates a new Agency connection from the given client
// connection.
// The number of endpoints of the client must be exactly 1.
func NewAgencyClient(c driver.Client) (Agency, error) {
if len(c.Connection().Endpoints()) > 1 {
return nil, maskAny(fmt.Errorf("Got multiple endpoints"))
}
return &agency{
conn: c.Connection(),
}, nil
}
type agency struct {
conn driver.Connection
}
// ReadKey reads the value of a given key in the agency.
func (a *agency) ReadKey(ctx context.Context, key []string, value interface{}) error {
conn := a.conn
req, err := conn.NewRequest("POST", "_api/agency/read")
if err != nil {
return maskAny(err)
}
fullKey := createFullKey(key)
input := [][]string{{fullKey}}
req, err = req.SetBody(input)
if err != nil {
return maskAny(err)
}
//var raw []byte
//ctx = driver.WithRawResponse(ctx, &raw)
resp, err := conn.Do(ctx, req)
if err != nil {
fmt.Printf("conn.Do failed, err=%v, resp=%#v\n", err, resp)
return maskAny(err)
}
if resp.StatusCode() == 307 {
// Not leader
location := resp.Header("Location")
return NotLeaderError{Leader: location}
}
if err := resp.CheckStatus(200, 201, 202); err != nil {
return maskAny(err)
}
//fmt.Printf("Agent response: %s\n", string(raw))
elems, err := resp.ParseArrayBody()
if err != nil {
return maskAny(err)
}
if len(elems) != 1 {
return maskAny(fmt.Errorf("Expected 1 element, got %d", len(elems)))
}
// If empty key parse directly
if len(key) == 0 {
if err := elems[0].ParseBody("", &value); err != nil {
return maskAny(err)
}
} else {
// Now remove all wrapping objects for each key element
var rawObject map[string]interface{}
if err := elems[0].ParseBody("", &rawObject); err != nil {
return maskAny(err)
}
var rawMsg interface{}
for keyIndex := 0; keyIndex < len(key); keyIndex++ {
if keyIndex > 0 {
var ok bool
rawObject, ok = rawMsg.(map[string]interface{})
if !ok {
return maskAny(fmt.Errorf("Data is not an object at key %s", key[:keyIndex+1]))
}
}
var found bool
rawMsg, found = rawObject[key[keyIndex]]
if !found {
return errors.Wrapf(KeyNotFoundError, "Missing data at key %s", key[:keyIndex+1])
}
}
// Encode to json ...
encoded, err := json.Marshal(rawMsg)
if err != nil {
return maskAny(err)
}
// and decode back into result
if err := json.Unmarshal(encoded, &value); err != nil {
return maskAny(err)
}
}
// fmt.Printf("result as JSON: %s\n", rawResult)
return nil
}
// Endpoint returns the endpoint of this agent connection
func (a *agency) Endpoint() string {
ep := a.conn.Endpoints()
if len(ep) == 0 {
return ""
}
return ep[0]
}
func createFullKey(key []string) string {
return "/" + strings.Join(key, "/")
}

View file

@ -129,8 +129,9 @@ func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interfa
transport = sharedHTTPSTransport
}
connConfig := http.ConnectionConfig{
Endpoints: []string{scheme + "://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))},
Transport: transport,
Endpoints: []string{scheme + "://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))},
Transport: transport,
DontFollowRedirect: true,
}
// TODO deal with TLS with proper CA checking
conn, err := http.NewConnection(connConfig)

View file

@ -0,0 +1,42 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package arangod
import "net/url"
// IsSameEndpoint returns true when the 2 given endpoints
// refer to the same server.
func IsSameEndpoint(a, b string) bool {
if a == b {
return true
}
ua, err := url.Parse(a)
if err != nil {
return false
}
ub, err := url.Parse(b)
if err != nil {
return false
}
return ua.Hostname() == ub.Hostname()
}

View file

@ -25,5 +25,32 @@ package arangod
import "github.com/pkg/errors"
var (
KeyNotFoundError = errors.New("Key not found")
maskAny = errors.WithStack
)
// IsKeyNotFound returns true if the given error is (or is caused by) a KeyNotFoundError.
func IsKeyNotFound(err error) bool {
return errors.Cause(err) == KeyNotFoundError
}
// NotLeaderError indicates the response of an agent when it is
// not the leader of the agency.
type NotLeaderError struct {
Leader string // Endpoint of the current leader
}
// Error implements error.
func (e NotLeaderError) Error() string {
return "not the leader"
}
// IsNotLeader returns true if the given error is (or is caused by) a NotLeaderError.
func IsNotLeader(err error) (string, bool) {
nlErr, ok := err.(NotLeaderError)
if ok {
return nlErr.Leader, true
}
return "", false
}

View file

@ -29,7 +29,7 @@ import (
// CreatePodDNSName returns the DNS of a pod with a given role & id in
// a given deployment.
func CreatePodDNSName(deployment metav1.Object, role, id string) string {
return CreatePodName(deployment.GetName(), role, id) + "." +
return CreatePodHostName(deployment.GetName(), role, id) + "." +
CreateHeadlessServiceName(deployment.GetName()) + "." +
deployment.GetNamespace() + ".svc"
}

View file

@ -25,10 +25,12 @@ package k8sutil
import (
"fmt"
"regexp"
"strings"
)
var (
resourceNameRE = regexp.MustCompile(`^([0-9\-\.a-z])+$`)
resourceNameRE = regexp.MustCompile(`^([0-9\-\.a-z])+$`)
arangodPrefixes = []string{"CRDN-", "PRMR-", "AGNT-"}
)
// ValidateOptionalResourceName validates a kubernetes resource name.
@ -55,3 +57,13 @@ func ValidateResourceName(name string) error {
}
return maskAny(fmt.Errorf("Name '%s' is not a valid resource name", name))
}
// stripArangodPrefix removes well know arangod ID prefixes from the given id.
func stripArangodPrefix(id string) string {
for _, prefix := range arangodPrefixes {
if strings.HasPrefix(id, prefix) {
return id[len(prefix):]
}
}
return id
}

View file

@ -23,12 +23,16 @@
package k8sutil
import (
"fmt"
"path/filepath"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
const (
alpineImage = "alpine"
arangodVolumeName = "arangod-data"
tlsKeyfileVolumeName = "tls-keyfile"
rocksdbEncryptionVolumeName = "rocksdb-encryption"
@ -96,14 +100,23 @@ func getPodCondition(status *v1.PodStatus, condType v1.PodConditionType) *v1.Pod
// CreatePodName returns the name of the pod for a member with
// a given id in a deployment with a given name.
func CreatePodName(deploymentName, role, id string) string {
return deploymentName + "-" + role + "-" + id
func CreatePodName(deploymentName, role, id, suffix string) string {
if len(suffix) > 0 && suffix[0] != '-' {
suffix = "-" + suffix
}
return CreatePodHostName(deploymentName, role, id) + suffix
}
// CreatePodHostName returns the hostname of the pod for a member with
// a given id in a deployment with a given name.
func CreatePodHostName(deploymentName, role, id string) string {
return deploymentName + "-" + role + "-" + stripArangodPrefix(id)
}
// CreateTLSKeyfileSecretName returns the name of the Secret that holds the TLS keyfile for a member with
// a given id in a deployment with a given name.
func CreateTLSKeyfileSecretName(deploymentName, role, id string) string {
return CreatePodName(deploymentName, role, id) + "-tls-keyfile"
return CreatePodName(deploymentName, role, id, "-tls-keyfile")
}
// arangodVolumeMounts creates a volume mount structure for arangod.
@ -133,6 +146,23 @@ func rocksdbEncryptionVolumeMounts() []v1.VolumeMount {
}
}
// arangodInitContainer creates a container configured to
// initalize a UUID file.
func arangodInitContainer(name, id string) v1.Container {
uuidFile := filepath.Join(ArangodVolumeMountDir, "UUID")
c := v1.Container{
Command: []string{
"/bin/sh",
"-c",
fmt.Sprintf("test -f %s || echo '%s' > %s", uuidFile, id, uuidFile),
},
Name: name,
Image: alpineImage,
VolumeMounts: arangodVolumeMounts(),
}
return c
}
// arangodContainer creates a container configured to run `arangod`.
func arangodContainer(name string, image string, imagePullPolicy v1.PullPolicy, args []string, env map[string]EnvValue, livenessProbe *HTTPProbeConfig, readinessProbe *HTTPProbeConfig) v1.Container {
c := v1.Container{
@ -188,17 +218,17 @@ func arangosyncContainer(name string, image string, imagePullPolicy v1.PullPolic
}
// newPod creates a basic Pod for given settings.
func newPod(deploymentName, ns, role, id string) v1.Pod {
name := CreatePodName(deploymentName, role, id)
func newPod(deploymentName, ns, role, id, podName string) v1.Pod {
hostname := CreatePodHostName(deploymentName, role, id)
p := v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Name: podName,
Labels: LabelsForDeployment(deploymentName, role),
},
Spec: v1.PodSpec{
Hostname: name,
Hostname: hostname,
Subdomain: CreateHeadlessServiceName(deploymentName),
RestartPolicy: v1.RestartPolicyOnFailure,
RestartPolicy: v1.RestartPolicyNever,
},
}
return p
@ -208,12 +238,12 @@ func newPod(deploymentName, ns, role, id string) v1.Pod {
// If the pod already exists, nil is returned.
// If another error occurs, that error is returned.
func CreateArangodPod(kubecli kubernetes.Interface, developmentMode bool, deployment APIObject,
role, id, pvcName, image string, imagePullPolicy v1.PullPolicy,
role, id, podName, pvcName, image string, imagePullPolicy v1.PullPolicy,
args []string, env map[string]EnvValue,
livenessProbe *HTTPProbeConfig, readinessProbe *HTTPProbeConfig,
tlsKeyfileSecretName, rocksdbEncryptionSecretName string) error {
// Prepare basic pod
p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id)
p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id, podName)
// Add arangod container
c := arangodContainer("arangod", image, imagePullPolicy, args, env, livenessProbe, readinessProbe)
@ -225,6 +255,9 @@ func CreateArangodPod(kubecli kubernetes.Interface, developmentMode bool, deploy
}
p.Spec.Containers = append(p.Spec.Containers, c)
// Add UUID init container
p.Spec.InitContainers = append(p.Spec.InitContainers, arangodInitContainer("uuid", id))
// Add volume
if pvcName != "" {
// Create PVC
@ -286,10 +319,10 @@ func CreateArangodPod(kubecli kubernetes.Interface, developmentMode bool, deploy
// CreateArangoSyncPod creates a Pod that runs `arangosync`.
// If the pod already exists, nil is returned.
// If another error occurs, that error is returned.
func CreateArangoSyncPod(kubecli kubernetes.Interface, developmentMode bool, deployment APIObject, role, id, image string, imagePullPolicy v1.PullPolicy,
func CreateArangoSyncPod(kubecli kubernetes.Interface, developmentMode bool, deployment APIObject, role, id, podName, image string, imagePullPolicy v1.PullPolicy,
args []string, env map[string]EnvValue, livenessProbe *HTTPProbeConfig, affinityWithRole string) error {
// Prepare basic pod
p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id)
p := newPod(deployment.GetName(), deployment.GetNamespace(), role, id, podName)
// Add arangosync container
c := arangosyncContainer("arangosync", image, imagePullPolicy, args, env, livenessProbe)

View file

@ -31,7 +31,7 @@ import (
// CreatePersistentVolumeClaimName returns the name of the persistent volume claim for a member with
// a given id in a deployment with a given name.
func CreatePersistentVolumeClaimName(deploymentName, role, id string) string {
return deploymentName + "-" + role + "-" + id
return deploymentName + "-" + role + "-" + stripArangodPrefix(id)
}
// CreatePersistentVolumeClaim creates a persistent volume claim with given name and configuration.

View file

@ -27,6 +27,20 @@ import (
"k8s.io/apimachinery/pkg/labels"
)
const (
// LabelKeyArangoDeployment is the key of the label used to store the ArangoDeployment name in
LabelKeyArangoDeployment = "arango_deployment"
// LabelKeyArangoLocalStorage is the key of the label used to store the ArangoLocalStorage name in
LabelKeyArangoLocalStorage = "arango_local_storage"
// LabelKeyApp is the key of the label used to store the application name in (fixed to AppName)
LabelKeyApp = "app"
// LabelKeyRole is the key of the label used to store the role of the resource in
LabelKeyRole = "role"
// AppName is the fixed value for the "app" label
AppName = "arangodb"
)
// addOwnerRefToObject adds given owner reference to given object
func addOwnerRefToObject(obj metav1.Object, ownerRef *metav1.OwnerReference) {
if ownerRef != nil {
@ -37,11 +51,11 @@ func addOwnerRefToObject(obj metav1.Object, ownerRef *metav1.OwnerReference) {
// LabelsForDeployment returns a map of labels, given to all resources for given deployment name
func LabelsForDeployment(deploymentName, role string) map[string]string {
l := map[string]string{
"arango_deployment": deploymentName,
"app": "arangodb",
LabelKeyArangoDeployment: deploymentName,
LabelKeyApp: AppName,
}
if role != "" {
l["role"] = role
l[LabelKeyRole] = role
}
return l
}
@ -49,11 +63,11 @@ func LabelsForDeployment(deploymentName, role string) map[string]string {
// LabelsForLocalStorage returns a map of labels, given to all resources for given local storage name
func LabelsForLocalStorage(localStorageName, role string) map[string]string {
l := map[string]string{
"arango_local_storage": localStorageName,
"app": "arangodb",
LabelKeyArangoLocalStorage: localStorageName,
LabelKeyApp: AppName,
}
if role != "" {
l["role"] = role
l[LabelKeyRole] = role
}
return l
}

View file

@ -177,11 +177,12 @@ func main() {
}
// Save output
outputPath, err := filepath.Abs(filepath.Join("manifests", "arango-"+group+options.OutputSuffix+".yaml"))
outputDir, err := filepath.Abs("manifests")
if err != nil {
log.Fatalf("Failed to get absolute output path: %v\n", err)
log.Fatalf("Failed to get absolute output dir: %v\n", err)
}
if err := os.MkdirAll(filepath.Base(outputPath), 0755); err != nil {
outputPath := filepath.Join(outputDir, "arango-"+group+options.OutputSuffix+".yaml")
if err := os.MkdirAll(outputDir, 0755); err != nil {
log.Fatalf("Failed to create output directory: %v\n", err)
}
if err := ioutil.WriteFile(outputPath, output.Bytes(), 0644); err != nil {