1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Merge branch 'master' into documentation/kube-dc2dc-tutorial

This commit is contained in:
Ewout Prangsma 2018-06-19 09:26:15 +02:00
commit 931c9bbb02
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
40 changed files with 490 additions and 218 deletions

View file

@ -39,7 +39,7 @@ metadata:
name: "replication-from-a-to-b"
spec:
source:
endpoint: ["https://163.172.149.229:31888", "https://51.15.225.110:31888", "https://51.15.229.133:31888"]
masterEndpoint: ["https://163.172.149.229:31888", "https://51.15.225.110:31888", "https://51.15.229.133:31888"]
auth:
keyfileSecretName: cluster-a-sync-auth
tls:
@ -67,7 +67,7 @@ with sync enabled.
This cluster configured as the replication source.
### `spec.source.endpoint: []string`
### `spec.source.masterEndpoint: []string`
This setting specifies zero or more master endpoint URL's of the source cluster.
@ -108,7 +108,7 @@ with sync enabled.
This cluster configured as the replication destination.
### `spec.destination.endpoint: []string`
### `spec.destination.masterEndpoint: []string`
This setting specifies zero or more master endpoint URL's of the destination cluster.

View file

@ -107,6 +107,23 @@ Possible values are:
This setting cannot be changed after the cluster has been created.
### `spec.downtimeAllowed: bool`
This setting is used to allow automatic reconciliation actions that yield
some downtime of the ArangoDB deployment.
When this setting is set to `false` (the default), no automatic action that
may result in downtime is allowed.
If the need for such an action is detected, an event is added to the `ArangoDeployment`.
Once this setting is set to `true`, the automatic action is executed.
Operations that may result in downtime are:
- Rotating TLS CA certificate
Note: It is still possible that there is some downtime when the Kubernetes
cluster is down, or in a bad state, irrespective of the value of this setting.
### `spec.rocksdb.encryption.keySecretName`
This setting specifies the name of a kubernetes `Secret` that contains

View file

@ -39,11 +39,11 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"github.com/arangodb/kube-arangodb/pkg/client"
scheme "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/scheme"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/operator"
"github.com/arangodb/kube-arangodb/pkg/server"

View file

@ -50,6 +50,7 @@ type DeploymentSpec struct {
StorageEngine *StorageEngine `json:"storageEngine,omitempty"`
Image *string `json:"image,omitempty"`
ImagePullPolicy *v1.PullPolicy `json:"imagePullPolicy,omitempty"`
DowntimeAllowed *bool `json:"downtimeAllowed,omitempty"`
ExternalAccess ExternalAccessSpec `json:"externalAccess"`
RocksDB RocksDBSpec `json:"rocksdb"`
@ -92,6 +93,11 @@ func (s DeploymentSpec) GetImagePullPolicy() v1.PullPolicy {
return util.PullPolicyOrDefault(s.ImagePullPolicy)
}
// IsDowntimeAllowed returns the value of downtimeAllowed.
func (s DeploymentSpec) IsDowntimeAllowed() bool {
return util.BoolOrDefault(s.DowntimeAllowed)
}
// IsAuthenticated returns true when authentication is enabled
func (s DeploymentSpec) IsAuthenticated() bool {
return s.Authentication.IsAuthenticated()
@ -171,6 +177,9 @@ func (s *DeploymentSpec) SetDefaultsFrom(source DeploymentSpec) {
if s.ImagePullPolicy == nil {
s.ImagePullPolicy = util.NewPullPolicyOrNil(source.ImagePullPolicy)
}
if s.DowntimeAllowed == nil {
s.DowntimeAllowed = util.NewBoolOrNil(source.DowntimeAllowed)
}
s.ExternalAccess.SetDefaultsFrom(source.ExternalAccess)
s.RocksDB.SetDefaultsFrom(source.RocksDB)
s.Authentication.SetDefaultsFrom(source.Authentication)

View file

@ -51,6 +51,12 @@ const (
ActionTypeRenewTLSCACertificate ActionType = "RenewTLSCACertificate"
)
const (
// MemberIDPreviousAction is used for Action.MemberID when the MemberID
// should be derived from the previous action.
MemberIDPreviousAction = "@previous"
)
// Action represents a single action to be taken to update a deployment.
type Action struct {
// ID of this action (unique for every action)

View file

@ -191,9 +191,5 @@ func (s ServerGroupSpec) ResetImmutableFields(group ServerGroup, fieldPrefix str
resetFields = append(resetFields, fieldPrefix+".count")
}
}
if s.GetStorageClassName() != target.GetStorageClassName() {
target.StorageClassName = util.NewStringOrNil(s.StorageClassName)
resetFields = append(resetFields, fieldPrefix+".storageClassName")
}
return resetFields
}

View file

@ -35,7 +35,7 @@ import (
type SyncExternalAccessSpec struct {
ExternalAccessSpec
MasterEndpoint []string `json:"masterEndpoint,omitempty"`
AccessPackageSecretNames []string `json:accessPackageSecretNames,omitempty"`
AccessPackageSecretNames []string `json:"accessPackageSecretNames,omitempty"`
}
// GetMasterEndpoint returns the value of masterEndpoint.

View file

@ -253,6 +253,15 @@ func (in *DeploymentSpec) DeepCopyInto(out *DeploymentSpec) {
**out = **in
}
}
if in.DowntimeAllowed != nil {
in, out := &in.DowntimeAllowed, &out.DowntimeAllowed
if *in == nil {
*out = nil
} else {
*out = new(bool)
**out = **in
}
}
in.ExternalAccess.DeepCopyInto(&out.ExternalAccess)
in.RocksDB.DeepCopyInto(&out.RocksDB)
in.Authentication.DeepCopyInto(&out.Authentication)

View file

@ -36,8 +36,8 @@ type EndpointSpec struct {
// DeploymentName holds the name of an ArangoDeployment resource.
// If set this provides default values for masterEndpoint, auth & tls.
DeploymentName *string `json:"deploymentName,omitempty"`
// Endpoint holds a list of URLs used to reach the syncmaster(s).
Endpoint []string `json:"endpoint,omitempty"`
// MasterEndpoint holds a list of URLs used to reach the syncmaster(s).
MasterEndpoint []string `json:"masterEndpoint,omitempty"`
// Authentication holds settings needed to authentication at the syncmaster.
Authentication EndpointAuthenticationSpec `json:"auth"`
// TLS holds settings needed to verify the TLS connection to the syncmaster.
@ -60,13 +60,13 @@ func (s EndpointSpec) Validate(isSourceEndpoint bool) error {
if err := k8sutil.ValidateOptionalResourceName(s.GetDeploymentName()); err != nil {
return maskAny(err)
}
for _, ep := range s.Endpoint {
for _, ep := range s.MasterEndpoint {
if _, err := url.Parse(ep); err != nil {
return maskAny(errors.Wrapf(ValidationError, "Invalid master endpoint '%s': %s", ep, err))
}
}
hasDeploymentName := s.HasDeploymentName()
if !hasDeploymentName && len(s.Endpoint) == 0 {
if !hasDeploymentName && len(s.MasterEndpoint) == 0 {
return maskAny(errors.Wrapf(ValidationError, "Provide a deploy name or at least one master endpoint"))
}
if err := s.Authentication.Validate(isSourceEndpoint || !hasDeploymentName); err != nil {

View file

@ -240,8 +240,8 @@ func (in *EndpointSpec) DeepCopyInto(out *EndpointSpec) {
**out = **in
}
}
if in.Endpoint != nil {
in, out := &in.Endpoint, &out.Endpoint
if in.MasterEndpoint != nil {
in, out := &in.MasterEndpoint, &out.MasterEndpoint
*out = make([]string, len(*in))
copy(*out, *in)
}

View file

@ -25,11 +25,14 @@ package deployment
import (
"context"
"fmt"
"net"
"strconv"
"github.com/arangodb/arangosync/client"
"github.com/arangodb/arangosync/tasks"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
"github.com/rs/zerolog/log"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -174,7 +177,11 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
dnsName := k8sutil.CreatePodDNSName(d.apiObject, group.AsRole(), id)
// Build client
source := client.Endpoint{dnsName}
port := k8sutil.ArangoSyncMasterPort
if group == api.ServerGroupSyncWorkers {
port = k8sutil.ArangoSyncWorkerPort
}
source := client.Endpoint{"https://" + net.JoinHostPort(dnsName, strconv.Itoa(port))}
tlsAuth := tasks.TLSAuthentication{
TLSClientAuthentication: tasks.TLSClientAuthentication{
ClientToken: monitoringToken,
@ -191,23 +198,23 @@ func (d *Deployment) GetSyncServerClient(ctx context.Context, group api.ServerGr
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (d *Deployment) CreateMember(group api.ServerGroup, id string) error {
func (d *Deployment) CreateMember(group api.ServerGroup, id string) (string, error) {
log := d.deps.Log
status, lastVersion := d.GetStatus()
id, err := createMember(log, &status, group, id, d.apiObject)
if err != nil {
log.Debug().Err(err).Str("group", group.AsRole()).Msg("Failed to create member")
return maskAny(err)
return "", maskAny(err)
}
// Save added member
if err := d.UpdateStatus(status, lastVersion); err != nil {
log.Debug().Err(err).Msg("Updating CR status failed")
return maskAny(err)
return "", maskAny(err)
}
// Create event about it
d.CreateEvent(k8sutil.NewMemberAddEvent(id, group.AsRole(), d.apiObject))
return nil
return id, nil
}
// DeletePod deletes a pod with given name in the namespace
@ -304,6 +311,16 @@ func (d *Deployment) GetOwnedPVCs() ([]v1.PersistentVolumeClaim, error) {
return myPVCs, nil
}
// GetPvc gets a PVC by the given name, in the samespace of the deployment.
func (d *Deployment) GetPvc(pvcName string) (*v1.PersistentVolumeClaim, error) {
pvc, err := d.deps.KubeCli.CoreV1().PersistentVolumeClaims(d.apiObject.GetNamespace()).Get(pvcName, metav1.GetOptions{})
if err != nil {
log.Debug().Err(err).Str("pvc-name", pvcName).Msg("Failed to get PVC")
return nil, maskAny(err)
}
return pvc, nil
}
// GetTLSKeyfile returns the keyfile encoded TLS certificate+key for
// the given member.
func (d *Deployment) GetTLSKeyfile(group api.ServerGroup, member api.MemberStatus) (string, error) {

View file

@ -32,10 +32,9 @@ import (
"github.com/arangodb/arangosync/client"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/chaos"
@ -60,6 +59,7 @@ type Dependencies struct {
Log zerolog.Logger
KubeCli kubernetes.Interface
DatabaseCRCli versioned.Interface
EventRecorder record.EventRecorder
}
// deploymentEventType strongly typed type of event
@ -96,8 +96,6 @@ type Deployment struct {
stopCh chan struct{}
stopped int32
eventsCli corev1.EventInterface
inspectTrigger trigger.Trigger
updateDeploymentTrigger trigger.Trigger
clientCache *clientCache
@ -121,7 +119,6 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
deps: deps,
eventCh: make(chan *deploymentEvent, deploymentEventQueueSize),
stopCh: make(chan struct{}),
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
clientCache: newClientCache(deps.KubeCli, apiObject),
}
d.status.last = *(apiObject.Status.DeepCopy())
@ -337,11 +334,8 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent() error {
// CreateEvent creates a given event.
// On error, the error is logged.
func (d *Deployment) CreateEvent(evt *v1.Event) {
_, err := d.eventsCli.Create(evt)
if err != nil {
d.deps.Log.Error().Err(err).Interface("event", *evt).Msg("Failed to record event")
}
func (d *Deployment) CreateEvent(evt *k8sutil.Event) {
d.deps.EventRecorder.Event(evt.InvolvedObject, evt.Type, evt.Reason, evt.Message)
}
// Update the status of the API object from the internal status

View file

@ -28,7 +28,6 @@ import (
"github.com/dchest/uniuri"
"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
@ -42,7 +41,7 @@ func (d *Deployment) createInitialMembers(apiObject *api.ArangoDeployment) error
log.Debug().Msg("creating initial members...")
// Go over all groups and create members
var events []*v1.Event
var events []*k8sutil.Event
status, lastVersion := d.GetStatus()
if err := apiObject.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, members *api.MemberStatusList) error {
for len(*members) < spec.GetCount() {

View file

@ -38,4 +38,6 @@ type Action interface {
CheckProgress(ctx context.Context) (bool, bool, error)
// Timeout returns the amount of time after which this action will timeout.
Timeout() time.Duration
// Return the MemberID used / created in this action
MemberID() string
}

View file

@ -43,19 +43,22 @@ func NewAddMemberAction(log zerolog.Logger, action api.Action, actionCtx ActionC
// actionAddMember implements an AddMemberAction.
type actionAddMember struct {
log zerolog.Logger
action api.Action
actionCtx ActionContext
log zerolog.Logger
action api.Action
actionCtx ActionContext
newMemberID string
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionAddMember) Start(ctx context.Context) (bool, error) {
if err := a.actionCtx.CreateMember(a.action.Group, a.action.MemberID); err != nil {
newID, err := a.actionCtx.CreateMember(a.action.Group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create member")
return false, maskAny(err)
}
a.newMemberID = newID
return true, nil
}
@ -70,3 +73,8 @@ func (a *actionAddMember) CheckProgress(ctx context.Context) (bool, bool, error)
func (a *actionAddMember) Timeout() time.Duration {
return addMemberTimeout
}
// Return the MemberID used / created in this action
func (a *actionAddMember) MemberID() string {
return a.newMemberID
}

View file

@ -155,3 +155,8 @@ func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, bool, e
func (a *actionCleanoutMember) Timeout() time.Duration {
return cleanoutMemberTimeout
}
// Return the MemberID used / created in this action
func (a *actionCleanoutMember) MemberID() string {
return a.action.MemberID
}

View file

@ -59,7 +59,7 @@ type ActionContext interface {
GetMemberStatusByID(id string) (api.MemberStatus, bool)
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
CreateMember(group api.ServerGroup, id string) error
CreateMember(group api.ServerGroup, id string) (string, error)
// UpdateMember updates the deployment status wrt the given member.
UpdateMember(member api.MemberStatus) error
// RemoveMemberByID removes a member with given id.
@ -157,11 +157,12 @@ func (ac *actionContext) GetMemberStatusByID(id string) (api.MemberStatus, bool)
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
func (ac *actionContext) CreateMember(group api.ServerGroup, id string) error {
if err := ac.context.CreateMember(group, id); err != nil {
return maskAny(err)
func (ac *actionContext) CreateMember(group api.ServerGroup, id string) (string, error) {
result, err := ac.context.CreateMember(group, id)
if err != nil {
return "", maskAny(err)
}
return nil
return result, nil
}
// UpdateMember updates the deployment status wrt the given member.

View file

@ -105,3 +105,8 @@ func (a *actionRemoveMember) CheckProgress(ctx context.Context) (bool, bool, err
func (a *actionRemoveMember) Timeout() time.Duration {
return removeMemberTimeout
}
// Return the MemberID used / created in this action
func (a *actionRemoveMember) MemberID() string {
return a.action.MemberID
}

View file

@ -69,3 +69,8 @@ func (a *renewTLSCACertificateAction) CheckProgress(ctx context.Context) (bool,
func (a *renewTLSCACertificateAction) Timeout() time.Duration {
return renewTLSCACertificateTimeout
}
// Return the MemberID used / created in this action
func (a *renewTLSCACertificateAction) MemberID() string {
return a.action.MemberID
}

View file

@ -75,3 +75,8 @@ func (a *renewTLSCertificateAction) CheckProgress(ctx context.Context) (bool, bo
func (a *renewTLSCertificateAction) Timeout() time.Duration {
return renewTLSCertificateTimeout
}
// Return the MemberID used / created in this action
func (a *renewTLSCertificateAction) MemberID() string {
return a.action.MemberID
}

View file

@ -127,3 +127,8 @@ func (a *actionRotateMember) CheckProgress(ctx context.Context) (bool, bool, err
func (a *actionRotateMember) Timeout() time.Duration {
return rotateMemberTimeout
}
// Return the MemberID used / created in this action
func (a *actionRotateMember) MemberID() string {
return a.action.MemberID
}

View file

@ -116,3 +116,8 @@ func (a *actionShutdownMember) CheckProgress(ctx context.Context) (bool, bool, e
func (a *actionShutdownMember) Timeout() time.Duration {
return shutdownMemberTimeout
}
// Return the MemberID used / created in this action
func (a *actionShutdownMember) MemberID() string {
return a.action.MemberID
}

View file

@ -133,3 +133,8 @@ func (a *actionUpgradeMember) CheckProgress(ctx context.Context) (bool, bool, er
func (a *actionUpgradeMember) Timeout() time.Duration {
return upgradeMemberTimeout
}
// Return the MemberID used / created in this action
func (a *actionUpgradeMember) MemberID() string {
return a.action.MemberID
}

View file

@ -74,7 +74,7 @@ func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, bool,
if a.action.Group == api.ServerGroupAgents {
return a.checkProgressAgent(ctx)
}
return a.checkProgressSingle(ctx)
return a.checkProgressSingleInActiveFailover(ctx)
default:
if a.action.Group == api.ServerGroupAgents {
return a.checkProgressAgent(ctx)
@ -99,6 +99,26 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool,
return true, false, nil
}
// checkProgressSingleInActiveFailover checks the progress of the action in the case
// of a single server as part of an active failover deployment.
func (a *actionWaitForMemberUp) checkProgressSingleInActiveFailover(ctx context.Context) (bool, bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client")
return false, false, maskAny(err)
}
if _, err := c.Version(ctx); err != nil {
log.Debug().Err(err).Msg("Failed to get version")
return false, false, maskAny(err)
}
if _, err := c.Databases(ctx); err != nil {
log.Debug().Err(err).Msg("Failed to get databases")
return false, false, maskAny(err)
}
return true, false, nil
}
// checkProgressAgent checks the progress of the action in the case
// of an agent.
func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, bool, error) {
@ -154,7 +174,7 @@ func (a *actionWaitForMemberUp) checkProgressCluster(ctx context.Context) (bool,
// of a sync master / worker.
func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bool, bool, error) {
log := a.log
c, err := a.actionCtx.GetSyncServerClient(ctx, a.action.Group, a.action.ID)
c, err := a.actionCtx.GetSyncServerClient(ctx, a.action.Group, a.action.MemberID)
if err != nil {
log.Debug().Err(err).Msg("Failed to create arangosync client")
return false, false, maskAny(err)
@ -170,3 +190,8 @@ func (a *actionWaitForMemberUp) checkProgressArangoSync(ctx context.Context) (bo
func (a *actionWaitForMemberUp) Timeout() time.Duration {
return waitForMemberUpTimeout
}
// Return the MemberID used / created in this action
func (a *actionWaitForMemberUp) MemberID() string {
return a.action.MemberID
}

View file

@ -59,10 +59,11 @@ type Context interface {
GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error)
// CreateEvent creates a given event.
// On error, the error is logged.
CreateEvent(evt *v1.Event)
CreateEvent(evt *k8sutil.Event)
// CreateMember adds a new member to the given group.
// If ID is non-empty, it will be used, otherwise a new ID is created.
CreateMember(group api.ServerGroup, id string) error
// Returns ID, error
CreateMember(group api.ServerGroup, id string) (string, error)
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(podName string) error
@ -74,6 +75,8 @@ type Context interface {
RemovePodFinalizers(podName string) error
// GetOwnedPods returns a list of all pods owned by the deployment.
GetOwnedPods() ([]v1.Pod, error)
// GetPvc gets a PVC by the given name, in the samespace of the deployment.
GetPvc(pvcName string) (*v1.PersistentVolumeClaim, error)
// GetTLSKeyfile returns the keyfile encoded TLS certificate+key for
// the given member.
GetTLSKeyfile(group api.ServerGroup, member api.MemberStatus) (string, error)

View file

@ -54,7 +54,7 @@ func (d *Reconciler) CreatePlan() error {
apiObject := d.context.GetAPIObject()
spec := d.context.GetSpec()
status, lastVersion := d.context.GetStatus()
newPlan, changed := createPlan(d.log, apiObject, status.Plan, spec, status, pods, d.context.GetTLSKeyfile, d.context.GetTLSCA)
newPlan, changed := createPlan(d.log, apiObject, status.Plan, spec, status, pods, d.context.GetTLSKeyfile, d.context.GetTLSCA, d.context.GetPvc, d.context.CreateEvent)
// If not change, we're done
if !changed {
@ -76,11 +76,13 @@ func (d *Reconciler) CreatePlan() error {
// createPlan considers the given specification & status and creates a plan to get the status in line with the specification.
// If a plan already exists, the given plan is returned with false.
// Otherwise the new plan is returned with a boolean true.
func createPlan(log zerolog.Logger, apiObject metav1.Object,
func createPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
currentPlan api.Plan, spec api.DeploymentSpec,
status api.DeploymentStatus, pods []v1.Pod,
getTLSKeyfile func(group api.ServerGroup, member api.MemberStatus) (string, error),
getTLSCA func(string) (string, string, bool, error)) (api.Plan, bool) {
getTLSCA func(string) (string, string, bool, error),
getPVC func(pvcName string) (*v1.PersistentVolumeClaim, error),
createEvent func(evt *k8sutil.Event)) (api.Plan, bool) {
if len(currentPlan) > 0 {
// Plan already exists, complete that first
return currentPlan, false
@ -175,16 +177,21 @@ func createPlan(log zerolog.Logger, apiObject metav1.Object,
})
}
// Check for the need to rotate TLS CA certificate and all members
if len(plan) == 0 {
plan = createRotateTLSCAPlan(log, spec, status, getTLSCA)
}
// Check for the need to rotate TLS certificate of a members
if len(plan) == 0 {
plan = createRotateTLSServerCertificatePlan(log, spec, status, getTLSKeyfile)
}
// Check for changes storage classes or requirements
if len(plan) == 0 {
plan = createRotateServerStoragePlan(log, apiObject, spec, status, getPVC, createEvent)
}
// Check for the need to rotate TLS CA certificate and all members
if len(plan) == 0 {
plan = createRotateTLSCAPlan(log, apiObject, spec, status, getTLSCA, createEvent)
}
// Return plan
return plan, true
}

View file

@ -0,0 +1,115 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package reconcile
import (
"github.com/rs/zerolog"
"k8s.io/api/core/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// createRotateServerStoragePlan creates plan to rotate a server and its volume because of a
// different storage class or a difference in storage resource requirements.
func createRotateServerStoragePlan(log zerolog.Logger, apiObject k8sutil.APIObject, spec api.DeploymentSpec, status api.DeploymentStatus,
getPVC func(pvcName string) (*v1.PersistentVolumeClaim, error),
createEvent func(evt *k8sutil.Event)) api.Plan {
if spec.GetMode() == api.DeploymentModeSingle {
// Storage cannot be changed in single server deployments
return nil
}
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if len(plan) > 0 {
// Only 1 change at a time
continue
}
if m.Phase != api.MemberPhaseCreated {
// Only make changes when phase is created
continue
}
if m.PersistentVolumeClaimName == "" {
// Plan is irrelevant without PVC
continue
}
groupSpec := spec.GetServerGroupSpec(group)
storageClassName := groupSpec.GetStorageClassName()
if storageClassName == "" {
// Using default storage class name
continue
}
// Load PVC
pvc, err := getPVC(m.PersistentVolumeClaimName)
if err != nil {
log.Warn().Err(err).
Str("role", group.AsRole()).
Str("id", m.ID).
Msg("Failed to get PVC")
continue
}
replacementNeeded := false
if util.StringOrDefault(pvc.Spec.StorageClassName) != storageClassName {
// Storageclass has changed
replacementNeeded = true
}
if replacementNeeded {
if group != api.ServerGroupAgents && group != api.ServerGroupDBServers {
// Only agents & dbservers are allowed to change their storage class.
createEvent(k8sutil.NewCannotChangeStorageClassEvent(apiObject, m.ID, group.AsRole(), "Not supported"))
continue
} else {
if group != api.ServerGroupAgents {
plan = append(plan,
// Scale up, so we're sure that a new member is available
api.NewAction(api.ActionTypeAddMember, group, ""),
api.NewAction(api.ActionTypeWaitForMemberUp, group, api.MemberIDPreviousAction),
)
}
if group == api.ServerGroupDBServers {
plan = append(plan,
// Cleanout
api.NewAction(api.ActionTypeCleanOutMember, group, m.ID),
)
}
plan = append(plan,
// Shutdown & remove the server
api.NewAction(api.ActionTypeShutdownMember, group, m.ID),
api.NewAction(api.ActionTypeRemoveMember, group, m.ID),
)
if group == api.ServerGroupAgents {
plan = append(plan,
// Scale up, so we're adding the removed agent (note: with the old ID)
api.NewAction(api.ActionTypeAddMember, group, m.ID),
api.NewAction(api.ActionTypeWaitForMemberUp, group, m.ID),
)
}
}
}
}
return nil
})
return plan
}

View file

@ -29,10 +29,12 @@ import (
"github.com/rs/zerolog"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// TestCreatePlanSingleScale creates a `single` deployment to test the creating of scaling plan.
@ -43,6 +45,10 @@ func TestCreatePlanSingleScale(t *testing.T) {
getTLSCA := func(string) (string, string, bool, error) {
return "", "", false, maskAny(fmt.Errorf("Not implemented"))
}
getPVC := func(pvcName string) (*v1.PersistentVolumeClaim, error) {
return nil, maskAny(fmt.Errorf("Not implemented"))
}
createEvent := func(evt *k8sutil.Event) {}
log := zerolog.Nop()
spec := api.DeploymentSpec{
Mode: api.NewMode(api.DeploymentModeSingle),
@ -58,7 +64,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
// Test with empty status
var status api.DeploymentStatus
newPlan, changed := createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed := createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -69,7 +75,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
@ -84,7 +90,7 @@ func TestCreatePlanSingleScale(t *testing.T) {
PodName: "something1",
},
}
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
assert.Len(t, newPlan, 0) // Single mode does not scale
}
@ -97,6 +103,10 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
getTLSCA := func(string) (string, string, bool, error) {
return "", "", false, maskAny(fmt.Errorf("Not implemented"))
}
getPVC := func(pvcName string) (*v1.PersistentVolumeClaim, error) {
return nil, maskAny(fmt.Errorf("Not implemented"))
}
createEvent := func(evt *k8sutil.Event) {}
log := zerolog.Nop()
spec := api.DeploymentSpec{
Mode: api.NewMode(api.DeploymentModeActiveFailover),
@ -113,7 +123,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
// Test with empty status
var status api.DeploymentStatus
newPlan, changed := createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed := createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
require.Len(t, newPlan, 2)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -126,7 +136,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
PodName: "something",
},
}
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
require.Len(t, newPlan, 1)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -151,7 +161,7 @@ func TestCreatePlanActiveFailoverScale(t *testing.T) {
PodName: "something4",
},
}
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
require.Len(t, newPlan, 2) // Note: Downscaling is only down 1 at a time
assert.Equal(t, api.ActionTypeShutdownMember, newPlan[0].Type)
@ -168,6 +178,10 @@ func TestCreatePlanClusterScale(t *testing.T) {
getTLSCA := func(string) (string, string, bool, error) {
return "", "", false, maskAny(fmt.Errorf("Not implemented"))
}
getPVC := func(pvcName string) (*v1.PersistentVolumeClaim, error) {
return nil, maskAny(fmt.Errorf("Not implemented"))
}
createEvent := func(evt *k8sutil.Event) {}
log := zerolog.Nop()
spec := api.DeploymentSpec{
Mode: api.NewMode(api.DeploymentModeCluster),
@ -183,7 +197,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
// Test with empty status
var status api.DeploymentStatus
newPlan, changed := createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed := createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
require.Len(t, newPlan, 6) // Adding 3 dbservers & 3 coordinators (note: agents do not scale now)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -216,7 +230,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
PodName: "coordinator1",
},
}
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
require.Len(t, newPlan, 3)
assert.Equal(t, api.ActionTypeAddMember, newPlan[0].Type)
@ -253,7 +267,7 @@ func TestCreatePlanClusterScale(t *testing.T) {
}
spec.DBServers.Count = util.NewInt(1)
spec.Coordinators.Count = util.NewInt(1)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA)
newPlan, changed = createPlan(log, depl, nil, spec, status, nil, getTLSKeyfile, getTLSCA, getPVC, createEvent)
assert.True(t, changed)
require.Len(t, newPlan, 5) // Note: Downscaling is done 1 at a time
assert.Equal(t, api.ActionTypeCleanOutMember, newPlan[0].Type)

View file

@ -29,6 +29,7 @@ import (
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog"
)
@ -62,10 +63,14 @@ func createRotateTLSServerCertificatePlan(log zerolog.Logger, spec api.Deploymen
Msg("Failed to get TLS secret")
continue
}
renewalNeeded := tlsKeyfileNeedsRenewal(log, keyfile)
tlsSpec := spec.TLS
if group.IsArangosync() {
tlsSpec = spec.Sync.TLS
}
renewalNeeded, reason := tlsKeyfileNeedsRenewal(log, keyfile, tlsSpec)
if renewalNeeded {
plan = append(append(plan,
api.NewAction(api.ActionTypeRenewTLSCertificate, group, m.ID)),
api.NewAction(api.ActionTypeRenewTLSCertificate, group, m.ID, reason)),
createRotateMemberPlan(log, m, group, "TLS certificate renewal")...,
)
}
@ -76,8 +81,10 @@ func createRotateTLSServerCertificatePlan(log zerolog.Logger, spec api.Deploymen
}
// createRotateTLSCAPlan creates plan to replace a TLS CA and rotate all server.
func createRotateTLSCAPlan(log zerolog.Logger, spec api.DeploymentSpec, status api.DeploymentStatus,
getTLSCA func(string) (string, string, bool, error)) api.Plan {
func createRotateTLSCAPlan(log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
getTLSCA func(string) (string, string, bool, error),
createEvent func(evt *k8sutil.Event)) api.Plan {
if !spec.TLS.IsSecure() {
return nil
}
@ -93,78 +100,45 @@ func createRotateTLSCAPlan(log zerolog.Logger, spec api.DeploymentSpec, status a
}
var plan api.Plan
if renewalNeeded, reason := tlsCANeedsRenewal(log, cert, spec.TLS); renewalNeeded {
var planSuffix api.Plan
plan = append(plan,
api.NewAction(api.ActionTypeRenewTLSCACertificate, 0, "", reason),
)
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseCreated {
// Only make changes when phase is created
continue
if spec.IsDowntimeAllowed() {
var planSuffix api.Plan
plan = append(plan,
api.NewAction(api.ActionTypeRenewTLSCACertificate, 0, "", reason),
)
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseCreated {
// Only make changes when phase is created
continue
}
if !group.IsArangod() {
// Sync master/worker is not applicable here
continue
}
plan = append(plan,
api.NewAction(api.ActionTypeRenewTLSCertificate, group, m.ID),
api.NewAction(api.ActionTypeRotateMember, group, m.ID, "TLS CA certificate changed"),
)
planSuffix = append(planSuffix,
api.NewAction(api.ActionTypeWaitForMemberUp, group, m.ID, "TLS CA certificate changed"),
)
}
if !group.IsArangod() {
// Sync master/worker is not applicable here
continue
}
plan = append(plan,
api.NewAction(api.ActionTypeRenewTLSCertificate, group, m.ID),
api.NewAction(api.ActionTypeRotateMember, group, m.ID, "TLS CA certificate changed"),
)
planSuffix = append(planSuffix,
api.NewAction(api.ActionTypeWaitForMemberUp, group, m.ID, "TLS CA certificate changed"),
)
}
return nil
})
plan = append(plan, planSuffix...)
return nil
})
plan = append(plan, planSuffix...)
} else {
// Rotating the CA results in downtime.
// That is currently not allowed.
createEvent(k8sutil.NewDowntimeNotAllowedEvent(apiObject, "Rotate TLS CA"))
}
}
return plan
}
// tlsKeyfileNeedsRenewal decides if the certificate in the given keyfile
// should be renewed.
func tlsKeyfileNeedsRenewal(log zerolog.Logger, keyfile string) bool {
func tlsKeyfileNeedsRenewal(log zerolog.Logger, keyfile string, spec api.TLSSpec) (bool, string) {
raw := []byte(keyfile)
for {
var derBlock *pem.Block
derBlock, raw = pem.Decode(raw)
if derBlock == nil {
break
}
if derBlock.Type == "CERTIFICATE" {
cert, err := x509.ParseCertificate(derBlock.Bytes)
if err != nil {
// We do not understand the certificate, let's renew it
log.Warn().Err(err).Msg("Failed to parse x509 certificate. Renewing it")
return true
}
if cert.IsCA {
// Only look at the server certificate, not CA or intermediate
continue
}
// Check expiration date. Renewal at 2/3 of lifetime.
ttl := cert.NotAfter.Sub(cert.NotBefore)
expirationDate := cert.NotBefore.Add((ttl / 3) * 2)
if expirationDate.Before(time.Now()) {
// We should renew now
log.Debug().
Str("not-before", cert.NotBefore.String()).
Str("not-after", cert.NotAfter.String()).
Str("expiration-date", expirationDate.String()).
Msg("TLS certificate renewal needed")
return true
}
}
}
return false
}
// tlsCANeedsRenewal decides if the given CA certificate
// should be renewed.
// Returns: shouldRenew, reason
func tlsCANeedsRenewal(log zerolog.Logger, cert string, spec api.TLSSpec) (bool, string) {
raw := []byte(cert)
// containsAll returns true when all elements in the expected list
// are in the actual list.
containsAll := func(actual []string, expected []string) bool {
@ -202,21 +176,21 @@ func tlsCANeedsRenewal(log zerolog.Logger, cert string, spec api.TLSSpec) (bool,
log.Warn().Err(err).Msg("Failed to parse x509 certificate. Renewing it")
return true, "Cannot parse x509 certificate: " + err.Error()
}
if !cert.IsCA {
// Only look at the CA certificate
if cert.IsCA {
// Only look at the server certificate, not CA or intermediate
continue
}
// Check expiration date. Renewal at 90% of lifetime.
// Check expiration date. Renewal at 2/3 of lifetime.
ttl := cert.NotAfter.Sub(cert.NotBefore)
expirationDate := cert.NotBefore.Add((ttl / 10) * 9)
expirationDate := cert.NotBefore.Add((ttl / 3) * 2)
if expirationDate.Before(time.Now()) {
// We should renew now
log.Debug().
Str("not-before", cert.NotBefore.String()).
Str("not-after", cert.NotAfter.String()).
Str("expiration-date", expirationDate.String()).
Msg("TLS CA certificate renewal needed")
return true, "CA Certificate about to expire"
Msg("TLS certificate renewal needed")
return true, "Server certificate about to expire"
}
// Check alternate names against spec
dnsNames, ipAddresses, emailAddress, err := spec.GetParsedAltNames()
@ -235,3 +209,42 @@ func tlsCANeedsRenewal(log zerolog.Logger, cert string, spec api.TLSSpec) (bool,
}
return false, ""
}
// tlsCANeedsRenewal decides if the given CA certificate
// should be renewed.
// Returns: shouldRenew, reason
func tlsCANeedsRenewal(log zerolog.Logger, cert string, spec api.TLSSpec) (bool, string) {
raw := []byte(cert)
for {
var derBlock *pem.Block
derBlock, raw = pem.Decode(raw)
if derBlock == nil {
break
}
if derBlock.Type == "CERTIFICATE" {
cert, err := x509.ParseCertificate(derBlock.Bytes)
if err != nil {
// We do not understand the certificate, let's renew it
log.Warn().Err(err).Msg("Failed to parse x509 certificate. Renewing it")
return true, "Cannot parse x509 certificate: " + err.Error()
}
if !cert.IsCA {
// Only look at the CA certificate
continue
}
// Check expiration date. Renewal at 90% of lifetime.
ttl := cert.NotAfter.Sub(cert.NotBefore)
expirationDate := cert.NotBefore.Add((ttl / 10) * 9)
if expirationDate.Before(time.Now()) {
// We should renew now
log.Debug().
Str("not-before", cert.NotBefore.String()).
Str("not-after", cert.NotAfter.String()).
Str("expiration-date", expirationDate.String()).
Msg("TLS CA certificate renewal needed")
return true, "CA Certificate about to expire"
}
}
}
return false, ""
}

View file

@ -76,6 +76,10 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) {
if ready {
// Remove action from list
status.Plan = status.Plan[1:]
if len(status.Plan) > 0 && status.Plan[0].MemberID == api.MemberIDPreviousAction {
// Fill in MemberID from previous action
status.Plan[0].MemberID = action.MemberID()
}
} else {
// Mark start time
now := metav1.Now()
@ -105,6 +109,10 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) {
status, lastVersion := d.context.GetStatus()
// Remove action from list
status.Plan = status.Plan[1:]
if len(status.Plan) > 0 && status.Plan[0].MemberID == api.MemberIDPreviousAction {
// Fill in MemberID from previous action
status.Plan[0].MemberID = action.MemberID()
}
// Save plan update
if err := d.context.UpdateStatus(status, lastVersion); err != nil {
log.Debug().Err(err).Msg("Failed to update CR status")

View file

@ -45,20 +45,13 @@ const (
// specified in the given spec.
func createTLSCACertificate(log zerolog.Logger, cli v1.CoreV1Interface, spec api.TLSSpec, deploymentName, namespace string, ownerRef *metav1.OwnerReference) error {
log = log.With().Str("secret", spec.GetCASecretName()).Logger()
dnsNames, ipAddresses, emailAddress, err := spec.GetParsedAltNames()
if err != nil {
log.Debug().Err(err).Msg("Failed to get alternate names")
return maskAny(err)
}
options := certificates.CreateCertificateOptions{
CommonName: fmt.Sprintf("%s Root Certificate", deploymentName),
Hosts: append(dnsNames, ipAddresses...),
EmailAddresses: emailAddress,
ValidFrom: time.Now(),
ValidFor: caTTL,
IsCA: true,
ECDSACurve: tlsECDSACurve,
CommonName: fmt.Sprintf("%s Root Certificate", deploymentName),
ValidFrom: time.Now(),
ValidFor: caTTL,
IsCA: true,
ECDSACurve: tlsECDSACurve,
}
cert, priv, err := certificates.CreateCertificate(options, nil)
if err != nil {

View file

@ -64,7 +64,7 @@ type Context interface {
GetNamespace() string
// CreateEvent creates a given event.
// On error, the error is logged.
CreateEvent(evt *v1.Event)
CreateEvent(evt *k8sutil.Event)
// GetOwnedPods returns a list of all pods owned by the deployment.
GetOwnedPods() ([]v1.Pod, error)
// GetOwnedPVCs returns a list of all PVCs owned by the deployment.

View file

@ -27,7 +27,6 @@ import (
"fmt"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
@ -47,7 +46,7 @@ const (
// the member status of the deployment accordingly.
func (r *Resources) InspectPods(ctx context.Context) error {
log := r.log
var events []*v1.Event
var events []*k8sutil.Event
pods, err := r.context.GetOwnedPods()
if err != nil {

View file

@ -212,6 +212,7 @@ func (o *Operator) makeDeploymentConfigAndDeps(apiObject *api.ArangoDeployment)
Logger(),
KubeCli: o.Dependencies.KubeCli,
DatabaseCRCli: o.Dependencies.CRCli,
EventRecorder: o.Dependencies.EventRecorder,
}
return cfg, deps
}

View file

@ -208,8 +208,9 @@ func (o *Operator) makeDeploymentReplicationConfigAndDeps(apiObject *api.ArangoD
Log: o.Dependencies.LogService.MustGetLogger("deployment-replication").With().
Str("deployment-replication", apiObject.GetName()).
Logger(),
KubeCli: o.Dependencies.KubeCli,
CRCli: o.Dependencies.CRCli,
KubeCli: o.Dependencies.KubeCli,
CRCli: o.Dependencies.CRCli,
EventRecorder: o.Dependencies.EventRecorder,
}
return cfg, deps
}

View file

@ -210,8 +210,9 @@ func (o *Operator) makeLocalStorageConfigAndDeps(apiObject *api.ArangoLocalStora
Log: o.Dependencies.LogService.MustGetLogger("storage").With().
Str("localStorage", apiObject.GetName()).
Logger(),
KubeCli: o.Dependencies.KubeCli,
StorageCRCli: o.Dependencies.CRCli,
KubeCli: o.Dependencies.KubeCli,
StorageCRCli: o.Dependencies.CRCli,
EventRecorder: o.Dependencies.EventRecorder,
}
return cfg, deps
}

View file

@ -29,10 +29,9 @@ import (
"time"
"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"github.com/arangodb/arangosync/client"
api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1alpha"
@ -49,9 +48,10 @@ type Config struct {
// Dependencies holds dependent services for a DeploymentReplication
type Dependencies struct {
Log zerolog.Logger
KubeCli kubernetes.Interface
CRCli versioned.Interface
Log zerolog.Logger
KubeCli kubernetes.Interface
CRCli versioned.Interface
EventRecorder record.EventRecorder
}
// deploymentReplicationEvent strongly typed type of event
@ -84,8 +84,6 @@ type DeploymentReplication struct {
stopCh chan struct{}
stopped int32
eventsCli corev1.EventInterface
inspectTrigger trigger.Trigger
recentInspectionErrors int
clientCache client.ClientCache
@ -103,7 +101,6 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeploymentReplic
deps: deps,
eventCh: make(chan *deploymentReplicationEvent, deploymentReplicationEventQueueSize),
stopCh: make(chan struct{}),
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
}
go dr.run()
@ -241,10 +238,8 @@ func (dr *DeploymentReplication) handleArangoDeploymentReplicationUpdatedEvent(e
// createEvent creates a given event.
// On error, the error is logged.
func (dr *DeploymentReplication) createEvent(evt *v1.Event) {
if _, err := dr.eventsCli.Create(evt); err != nil {
dr.deps.Log.Error().Err(err).Interface("event", *evt).Msg("Failed to record event")
}
func (dr *DeploymentReplication) createEvent(evt *k8sutil.Event) {
dr.deps.EventRecorder.Event(evt.InvolvedObject, evt.Type, evt.Reason, evt.Message)
}
// Update the status of the API object from the internal status

View file

@ -113,7 +113,7 @@ func (dr *DeploymentReplication) createArangoSyncEndpoint(epSpec api.EndpointSpe
dnsName := k8sutil.CreateSyncMasterClientServiceDNSName(depl)
return client.Endpoint{"https://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoSyncMasterPort))}, nil
}
return client.Endpoint(epSpec.Endpoint), nil
return client.Endpoint(epSpec.MasterEndpoint), nil
}
// createArangoSyncTLSAuthentication creates the authentication needed to authenticate

View file

@ -35,6 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
@ -52,9 +53,10 @@ type Config struct {
// Dependencies holds dependent services for a LocalStorage
type Dependencies struct {
Log zerolog.Logger
KubeCli kubernetes.Interface
StorageCRCli versioned.Interface
Log zerolog.Logger
KubeCli kubernetes.Interface
StorageCRCli versioned.Interface
EventRecorder record.EventRecorder
}
// localStorageEvent strongly typed type of event
@ -112,7 +114,6 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoLocalStorage) (*
deps: deps,
eventCh: make(chan *localStorageEvent, localStorageEventQueueSize),
stopCh: make(chan struct{}),
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
}
ls.pvCleaner = newPVCleaner(deps.Log, deps.KubeCli, ls.GetClientByNodeName)
@ -324,11 +325,8 @@ func (ls *LocalStorage) handleArangoLocalStorageUpdatedEvent(event *localStorage
// createEvent creates a given event.
// On error, the error is logged.
func (ls *LocalStorage) createEvent(evt *v1.Event) {
_, err := ls.eventsCli.Create(evt)
if err != nil {
ls.deps.Log.Error().Err(err).Interface("event", *evt).Msg("Failed to record event")
}
func (ls *LocalStorage) createEvent(evt *k8sutil.Event) {
ls.deps.EventRecorder.Event(evt.InvolvedObject, evt.Type, evt.Reason, evt.Message)
}
// Update the status of the API object from the internal status

View file

@ -24,25 +24,31 @@ package k8sutil
import (
"fmt"
"os"
"strings"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"k8s.io/apimachinery/pkg/runtime"
)
// Event is used to create events using an EventRecorder.
type Event struct {
InvolvedObject runtime.Object
Type string
Reason string
Message string
}
// APIObject helps to abstract an object from our custom API.
type APIObject interface {
runtime.Object
metav1.Object
// AsOwner creates an OwnerReference for the given deployment
AsOwner() metav1.OwnerReference
}
// NewMemberAddEvent creates an event indicating that a member was added.
func NewMemberAddEvent(memberName, role string, apiObject APIObject) *v1.Event {
func NewMemberAddEvent(memberName, role string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("New %s Added", strings.Title(role))
@ -51,7 +57,7 @@ func NewMemberAddEvent(memberName, role string, apiObject APIObject) *v1.Event {
}
// NewMemberRemoveEvent creates an event indicating that an existing member was removed.
func NewMemberRemoveEvent(memberName, role string, apiObject APIObject) *v1.Event {
func NewMemberRemoveEvent(memberName, role string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("%s Removed", strings.Title(role))
@ -60,7 +66,7 @@ func NewMemberRemoveEvent(memberName, role string, apiObject APIObject) *v1.Even
}
// NewPodCreatedEvent creates an event indicating that a pod has been created
func NewPodCreatedEvent(podName, role string, apiObject APIObject) *v1.Event {
func NewPodCreatedEvent(podName, role string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("Pod Of %s Created", strings.Title(role))
@ -69,7 +75,7 @@ func NewPodCreatedEvent(podName, role string, apiObject APIObject) *v1.Event {
}
// NewPodGoneEvent creates an event indicating that a pod is missing
func NewPodGoneEvent(podName, role string, apiObject APIObject) *v1.Event {
func NewPodGoneEvent(podName, role string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("Pod Of %s Gone", strings.Title(role))
@ -79,7 +85,7 @@ func NewPodGoneEvent(podName, role string, apiObject APIObject) *v1.Event {
// NewImmutableFieldEvent creates an event indicating that an attempt was made to change a field
// that is immutable.
func NewImmutableFieldEvent(fieldName string, apiObject APIObject) *v1.Event {
func NewImmutableFieldEvent(fieldName string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Immutable Field Change"
@ -88,7 +94,7 @@ func NewImmutableFieldEvent(fieldName string, apiObject APIObject) *v1.Event {
}
// NewPodsSchedulingFailureEvent creates an event indicating that one of more cannot be scheduled.
func NewPodsSchedulingFailureEvent(unscheduledPodNames []string, apiObject APIObject) *v1.Event {
func NewPodsSchedulingFailureEvent(unscheduledPodNames []string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Pods Scheduling Failure"
@ -98,7 +104,7 @@ func NewPodsSchedulingFailureEvent(unscheduledPodNames []string, apiObject APIOb
// NewPodsSchedulingResolvedEvent creates an event indicating that an earlier problem with
// pod scheduling has been resolved.
func NewPodsSchedulingResolvedEvent(apiObject APIObject) *v1.Event {
func NewPodsSchedulingResolvedEvent(apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Pods Scheduling Resolved"
@ -107,7 +113,7 @@ func NewPodsSchedulingResolvedEvent(apiObject APIObject) *v1.Event {
}
// NewSecretsChangedEvent creates an event indicating that one of more secrets have changed.
func NewSecretsChangedEvent(changedSecretNames []string, apiObject APIObject) *v1.Event {
func NewSecretsChangedEvent(changedSecretNames []string, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Secrets changed"
@ -117,7 +123,7 @@ func NewSecretsChangedEvent(changedSecretNames []string, apiObject APIObject) *v
// NewSecretsRestoredEvent creates an event indicating that all secrets have been restored
// to their original values.
func NewSecretsRestoredEvent(apiObject APIObject) *v1.Event {
func NewSecretsRestoredEvent(apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Secrets restored"
@ -127,7 +133,7 @@ func NewSecretsRestoredEvent(apiObject APIObject) *v1.Event {
// NewAccessPackageCreatedEvent creates an event indicating that a secret containing an access package
// has been created.
func NewAccessPackageCreatedEvent(apiObject APIObject, apSecretName string) *v1.Event {
func NewAccessPackageCreatedEvent(apiObject APIObject, apSecretName string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Access package created"
@ -137,7 +143,7 @@ func NewAccessPackageCreatedEvent(apiObject APIObject, apSecretName string) *v1.
// NewAccessPackageDeletedEvent creates an event indicating that a secret containing an access package
// has been deleted.
func NewAccessPackageDeletedEvent(apiObject APIObject, apSecretName string) *v1.Event {
func NewAccessPackageDeletedEvent(apiObject APIObject, apSecretName string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Access package deleted"
@ -147,7 +153,7 @@ func NewAccessPackageDeletedEvent(apiObject APIObject, apSecretName string) *v1.
// NewPlanTimeoutEvent creates an event indicating that an item on a reconciliation plan did not
// finish before its deadline.
func NewPlanTimeoutEvent(apiObject APIObject, itemType, memberID, role string) *v1.Event {
func NewPlanTimeoutEvent(apiObject APIObject, itemType, memberID, role string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Reconciliation Plan Timeout"
@ -157,7 +163,7 @@ func NewPlanTimeoutEvent(apiObject APIObject, itemType, memberID, role string) *
// NewPlanAbortedEvent creates an event indicating that an item on a reconciliation plan wants to abort
// the entire plan.
func NewPlanAbortedEvent(apiObject APIObject, itemType, memberID, role string) *v1.Event {
func NewPlanAbortedEvent(apiObject APIObject, itemType, memberID, role string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Reconciliation Plan Aborted"
@ -165,8 +171,28 @@ func NewPlanAbortedEvent(apiObject APIObject, itemType, memberID, role string) *
return event
}
// NewCannotChangeStorageClassEvent creates an event indicating that an item would need to use a different StorageClass,
// but this is not possible for the given reason.
func NewCannotChangeStorageClassEvent(apiObject APIObject, memberID, role, subReason string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = fmt.Sprintf("%s Member StorageClass Cannot Change", strings.Title(role))
event.Message = fmt.Sprintf("Member %s with role %s should use a different StorageClass, but is cannot because: %s", memberID, role, subReason)
return event
}
// NewDowntimeNotAllowedEvent creates an event indicating that an operation cannot be executed because downtime
// is currently not allowed.
func NewDowntimeNotAllowedEvent(apiObject APIObject, operation string) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeNormal
event.Reason = "Downtime Operation Postponed"
event.Message = fmt.Sprintf("The '%s' operation is postponed because downtime it not allowed. Set `spec.downtimeAllowed` to true to execute this operation", operation)
return event
}
// NewErrorEvent creates an even of type error.
func NewErrorEvent(reason string, err error, apiObject APIObject) *v1.Event {
func NewErrorEvent(reason string, err error, apiObject APIObject) *Event {
event := newDeploymentEvent(apiObject)
event.Type = v1.EventTypeWarning
event.Reason = strings.Title(reason)
@ -175,28 +201,8 @@ func NewErrorEvent(reason string, err error, apiObject APIObject) *v1.Event {
}
// newDeploymentEvent creates a new event for the given api object & owner.
func newDeploymentEvent(apiObject APIObject) *v1.Event {
t := time.Now()
owner := apiObject.AsOwner()
return &v1.Event{
ObjectMeta: metav1.ObjectMeta{
GenerateName: apiObject.GetName() + "-",
Namespace: apiObject.GetNamespace(),
},
InvolvedObject: v1.ObjectReference{
APIVersion: owner.APIVersion,
Kind: owner.Kind,
Name: owner.Name,
Namespace: apiObject.GetNamespace(),
UID: owner.UID,
ResourceVersion: apiObject.GetResourceVersion(),
},
Source: v1.EventSource{
Component: os.Getenv(constants.EnvOperatorPodName),
},
// Each deployment event is unique so it should not be collapsed with other events
FirstTimestamp: metav1.Time{Time: t},
LastTimestamp: metav1.Time{Time: t},
Count: int32(1),
func newDeploymentEvent(apiObject runtime.Object) *Event {
return &Event{
InvolvedObject: apiObject,
}
}