1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Merge pull request #71 from arangodb/resources

Moved low level resource (pod,pvc,secret,service) creation & inspection to resources sub-package.
This commit is contained in:
Ewout Prangsma 2018-03-26 08:29:30 +02:00 committed by GitHub
commit 2e7a95f846
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 423 additions and 234 deletions

View file

@ -28,17 +28,35 @@ import (
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// GetAPIObject returns the deployment as k8s object.
func (d *Deployment) GetAPIObject() metav1.Object {
func (d *Deployment) GetAPIObject() k8sutil.APIObject {
return d.apiObject
}
// GetServerGroupIterator returns the deployment as ServerGroupIterator.
func (d *Deployment) GetServerGroupIterator() resources.ServerGroupIterator {
return d.apiObject
}
// GetKubeCli returns the kubernetes client
func (d *Deployment) GetKubeCli() kubernetes.Interface {
return d.deps.KubeCli
}
// GetNamespace returns the kubernetes namespace that contains
// this deployment.
func (d *Deployment) GetNamespace() string {
return d.apiObject.GetNamespace()
}
// GetSpec returns the current specification
func (d *Deployment) GetSpec() api.DeploymentSpec {
return d.apiObject.Spec

View file

@ -36,6 +36,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment/reconcile"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
@ -94,6 +95,7 @@ type Deployment struct {
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
reconciler *reconcile.Reconciler
resources *resources.Resources
}
// New creates a new Deployment from the given API object.
@ -112,6 +114,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
clientCache: newClientCache(deps.KubeCli, apiObject),
}
d.reconciler = reconcile.NewReconciler(deps.Log, d)
d.resources = resources.NewResources(deps.Log, d)
if d.status.AcceptedSpec == nil {
// We've validated the spec, so let's use it from now.
d.status.AcceptedSpec = apiObject.Spec.DeepCopy()
@ -167,13 +170,13 @@ func (d *Deployment) run() {
if d.status.State == api.DeploymentStateNone {
// Create secrets
if err := d.createSecrets(d.apiObject); err != nil {
if err := d.resources.EnsureSecrets(); err != nil {
d.failOnError(err, "Failed to create secrets")
return
}
// Create services
if err := d.createServices(d.apiObject); err != nil {
if err := d.resources.EnsureServices(); err != nil {
d.failOnError(err, "Failed to create services")
return
}
@ -185,13 +188,13 @@ func (d *Deployment) run() {
}
// Create PVCs
if err := d.ensurePVCs(d.apiObject); err != nil {
if err := d.resources.EnsurePVCs(); err != nil {
d.failOnError(err, "Failed to create persistent volume claims")
return
}
// Create pods
if err := d.ensurePods(d.apiObject); err != nil {
if err := d.resources.EnsurePods(); err != nil {
d.failOnError(err, "Failed to create pods")
return
}
@ -266,18 +269,18 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
log.Debug().Strs("fields", resetFields).Msg("Found modified immutable fields")
}
if err := newAPIObject.Spec.Validate(); err != nil {
d.createEvent(k8sutil.NewErrorEvent("Validation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Validation failed", err, d.apiObject))
// Try to reset object
if err := d.updateCRSpec(d.apiObject.Spec); err != nil {
log.Error().Err(err).Msg("Restore original spec failed")
d.createEvent(k8sutil.NewErrorEvent("Restore original failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Restore original failed", err, d.apiObject))
}
return nil
}
if len(resetFields) > 0 {
for _, fieldName := range resetFields {
log.Debug().Str("field", fieldName).Msg("Reset immutable field")
d.createEvent(k8sutil.NewImmutableFieldEvent(fieldName, d.apiObject))
d.CreateEvent(k8sutil.NewImmutableFieldEvent(fieldName, d.apiObject))
}
}
@ -302,9 +305,9 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
return nil
}
// createEvent creates a given event.
// CreateEvent creates a given event.
// On error, the error is logged.
func (d *Deployment) createEvent(evt *v1.Event) {
func (d *Deployment) CreateEvent(evt *v1.Event) {
_, err := d.eventsCli.Create(evt)
if err != nil {
d.deps.Log.Error().Err(err).Interface("event", *evt).Msg("Failed to record event")

View file

@ -46,41 +46,45 @@ func (d *Deployment) inspectDeployment(lastInterval time.Duration) time.Duration
// Ensure we have image info
if retrySoon, err := d.ensureImages(d.apiObject); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Image detection failed", err, d.apiObject))
} else if retrySoon {
nextInterval = minInspectionInterval
}
// Inspection of generated resources needed
if err := d.inspectPods(); err != nil {
if err := d.resources.InspectPods(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Pod inspection failed", err, d.apiObject))
}
// Create scale/update plan
if err := d.reconciler.CreatePlan(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Plan creation failed", err, d.apiObject))
}
// Execute current step of scale/update plan
retrySoon, err := d.reconciler.ExecutePlan(ctx)
if err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Plan execution failed", err, d.apiObject))
}
if retrySoon {
nextInterval = minInspectionInterval
}
// Ensure all resources are created
if err := d.ensurePVCs(d.apiObject); err != nil {
if err := d.resources.EnsureServices(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("Service creation failed", err, d.apiObject))
}
if err := d.ensurePods(d.apiObject); err != nil {
if err := d.resources.EnsurePVCs(); err != nil {
hasError = true
d.createEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
d.CreateEvent(k8sutil.NewErrorEvent("PVC creation failed", err, d.apiObject))
}
if err := d.resources.EnsurePods(); err != nil {
hasError = true
d.CreateEvent(k8sutil.NewErrorEvent("Pod creation failed", err, d.apiObject))
}
// Update next interval (on errors)

View file

@ -29,7 +29,6 @@ import (
"strings"
"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@ -40,7 +39,6 @@ import (
const (
dockerPullableImageIDPrefix = "docker-pullable://"
imageIDAndVersionRole = "id" // Role use by identification pods
)
type imagesBuilder struct {
@ -99,7 +97,7 @@ func (ib *imagesBuilder) Run(ctx context.Context) (bool, error) {
// When no pod exists, it is created, otherwise the ID is fetched & version detected.
// Returns: retrySoon, error
func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, image string) (bool, error) {
role := imageIDAndVersionRole
role := k8sutil.ImageIDAndVersionRole
id := fmt.Sprintf("%0x", sha1.Sum([]byte(image)))[:6]
podName := k8sutil.CreatePodName(ib.APIObject.GetName(), role, id, "")
ns := ib.APIObject.GetNamespace()
@ -175,9 +173,3 @@ func (ib *imagesBuilder) fetchArangoDBImageIDAndVersion(ctx context.Context, ima
// Come back soon to inspect the pod
return true, nil
}
// isArangoDBImageIDAndVersionPod returns true if the given pod is used for fetching image ID and ArangoDB version of an image
func isArangoDBImageIDAndVersionPod(p v1.Pod) bool {
role, found := p.GetLabels()[k8sutil.LabelKeyRole]
return found && role == imageIDAndVersionRole
}

View file

@ -65,8 +65,8 @@ type ActionContext interface {
DeletePvc(pvcName string) error
}
// NewActionContext creates a new ActionContext implementation.
func NewActionContext(log zerolog.Logger, context ReconcileContext) ActionContext {
// newActionContext creates a new ActionContext implementation.
func newActionContext(log zerolog.Logger, context Context) ActionContext {
return &actionContext{
log: log,
context: context,
@ -76,7 +76,7 @@ func NewActionContext(log zerolog.Logger, context ReconcileContext) ActionContex
// actionContext implements ActionContext
type actionContext struct {
log zerolog.Logger
context ReconcileContext
context Context
}
// Gets the specified mode of deployment

View file

@ -27,16 +27,16 @@ import (
driver "github.com/arangodb/go-driver"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// ReconcileContext provides methods to the reconcile package.
type ReconcileContext interface {
// Context provides methods to the reconcile package.
type Context interface {
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() metav1.Object
GetAPIObject() k8sutil.APIObject
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment

View file

@ -113,7 +113,7 @@ func (d *Reconciler) ExecutePlan(ctx context.Context) (bool, error) {
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (d *Reconciler) createAction(ctx context.Context, log zerolog.Logger, action api.Action) Action {
actionCtx := NewActionContext(log, d.context)
actionCtx := newActionContext(log, d.context)
switch action.Type {
case api.ActionTypeAddMember:
return NewAddMemberAction(log, action, actionCtx)

View file

@ -28,11 +28,11 @@ import "github.com/rs/zerolog"
// in line with its (changed) specification.
type Reconciler struct {
log zerolog.Logger
context ReconcileContext
context Context
}
// NewReconciler creates a new reconciler with given context.
func NewReconciler(log zerolog.Logger, context ReconcileContext) *Reconciler {
func NewReconciler(log zerolog.Logger, context Context) *Reconciler {
return &Reconciler{
log: log,
context: context,

View file

@ -0,0 +1,65 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package resources
import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
// ServerGroupIterator provides a helper to callback on every server
// group of the deployment.
type ServerGroupIterator interface {
// ForeachServerGroup calls the given callback for all server groups.
// If the callback returns an error, this error is returned and no other server
// groups are processed.
// Groups are processed in this order: agents, single, dbservers, coordinators, syncmasters, syncworkers
ForeachServerGroup(cb func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error, status *api.DeploymentStatus) error
}
// Context provides all functions needed by the Resources service
// to perform its service.
type Context interface {
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
// GetServerGroupIterator returns the deployment as ServerGroupIterator.
GetServerGroupIterator() ServerGroupIterator
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment
GetStatus() api.DeploymentStatus
// UpdateStatus replaces the status of the deployment with the given status and
// updates the resources in k8s.
UpdateStatus(status api.DeploymentStatus, force ...bool) error
// GetKubeCli returns the kubernetes client
GetKubeCli() kubernetes.Interface
// GetNamespace returns the namespace that contains the deployment
GetNamespace() string
// createEvent creates a given event.
// On error, the error is logged.
CreateEvent(evt *v1.Event)
// GetOwnedPods returns a list of all pods owned by the deployment.
GetOwnedPods() ([]v1.Pod, error)
}

View file

@ -0,0 +1,29 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package resources
import "github.com/pkg/errors"
var (
maskAny = errors.WithStack
)

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"crypto/sha1"
@ -217,18 +217,18 @@ func createArangodArgs(apiObject metav1.Object, deplSpec api.DeploymentSpec, gro
}
// createArangoSyncArgs creates command line arguments for an arangosync server in the given group.
func createArangoSyncArgs(apiObject *api.ArangoDeployment, group api.ServerGroup, spec api.ServerGroupSpec, agents api.MemberStatusList, id string) []string {
func createArangoSyncArgs(spec api.DeploymentSpec, group api.ServerGroup, groupSpec api.ServerGroupSpec, agents api.MemberStatusList, id string) []string {
// TODO
return nil
}
// createLivenessProbe creates configuration for a liveness probe of a server in the given group.
func (d *Deployment) createLivenessProbe(apiObject *api.ArangoDeployment, group api.ServerGroup) (*k8sutil.HTTPProbeConfig, error) {
func (r *Resources) createLivenessProbe(spec api.DeploymentSpec, group api.ServerGroup) (*k8sutil.HTTPProbeConfig, error) {
switch group {
case api.ServerGroupSingle, api.ServerGroupAgents, api.ServerGroupDBServers:
authorization := ""
if apiObject.Spec.IsAuthenticated() {
secretData, err := d.getJWTSecret(apiObject)
if spec.IsAuthenticated() {
secretData, err := r.getJWTSecret(spec)
if err != nil {
return nil, maskAny(err)
}
@ -239,16 +239,16 @@ func (d *Deployment) createLivenessProbe(apiObject *api.ArangoDeployment, group
}
return &k8sutil.HTTPProbeConfig{
LocalPath: "/_api/version",
Secure: apiObject.Spec.IsSecure(),
Secure: spec.IsSecure(),
Authorization: authorization,
}, nil
case api.ServerGroupCoordinators:
return nil, nil
case api.ServerGroupSyncMasters, api.ServerGroupSyncWorkers:
authorization := ""
if apiObject.Spec.Sync.Monitoring.GetTokenSecretName() != "" {
if spec.Sync.Monitoring.GetTokenSecretName() != "" {
// Use monitoring token
token, err := d.getSyncMonitoringToken(apiObject)
token, err := r.getSyncMonitoringToken(spec)
if err != nil {
return nil, maskAny(err)
}
@ -258,7 +258,7 @@ func (d *Deployment) createLivenessProbe(apiObject *api.ArangoDeployment, group
}
} else if group == api.ServerGroupSyncMasters {
// Fall back to JWT secret
secretData, err := d.getSyncJWTSecret(apiObject)
secretData, err := r.getSyncJWTSecret(spec)
if err != nil {
return nil, maskAny(err)
}
@ -272,7 +272,7 @@ func (d *Deployment) createLivenessProbe(apiObject *api.ArangoDeployment, group
}
return &k8sutil.HTTPProbeConfig{
LocalPath: "/_api/version",
Secure: apiObject.Spec.IsSecure(),
Secure: spec.IsSecure(),
Authorization: authorization,
}, nil
default:
@ -281,13 +281,13 @@ func (d *Deployment) createLivenessProbe(apiObject *api.ArangoDeployment, group
}
// createReadinessProbe creates configuration for a readiness probe of a server in the given group.
func (d *Deployment) createReadinessProbe(apiObject *api.ArangoDeployment, group api.ServerGroup) (*k8sutil.HTTPProbeConfig, error) {
func (r *Resources) createReadinessProbe(spec api.DeploymentSpec, group api.ServerGroup) (*k8sutil.HTTPProbeConfig, error) {
if group != api.ServerGroupCoordinators {
return nil, nil
}
authorization := ""
if apiObject.Spec.IsAuthenticated() {
secretData, err := d.getJWTSecret(apiObject)
if spec.IsAuthenticated() {
secretData, err := r.getJWTSecret(spec)
if err != nil {
return nil, maskAny(err)
}
@ -298,117 +298,132 @@ func (d *Deployment) createReadinessProbe(apiObject *api.ArangoDeployment, group
}
return &k8sutil.HTTPProbeConfig{
LocalPath: "/_api/version",
Secure: apiObject.Spec.IsSecure(),
Secure: spec.IsSecure(),
Authorization: authorization,
}, nil
}
// ensurePods creates all Pods listed in member status
func (d *Deployment) ensurePods(apiObject *api.ArangoDeployment) error {
kubecli := d.deps.KubeCli
log := d.deps.Log
ns := apiObject.GetNamespace()
// createPodForMember creates all Pods listed in member status
func (r *Resources) createPodForMember(spec api.DeploymentSpec, group api.ServerGroup,
groupSpec api.ServerGroupSpec, m api.MemberStatus, memberStatusList *api.MemberStatusList) error {
kubecli := r.context.GetKubeCli()
log := r.log
apiObject := r.context.GetAPIObject()
ns := r.context.GetNamespace()
status := r.context.GetStatus()
if err := apiObject.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error {
// Update pod name
role := group.AsRole()
roleAbbr := group.AsRoleAbbreviated()
podSuffix := createPodSuffix(spec)
m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, podSuffix)
newState := api.MemberStateCreated
// Create pod
if group.IsArangod() {
// Find image ID
info, found := status.Images.GetByImage(spec.GetImage())
if !found {
log.Debug().Str("image", spec.GetImage()).Msg("Image ID is not known yet for image")
return nil
}
// Prepare arguments
autoUpgrade := m.Conditions.IsTrue(api.ConditionTypeAutoUpgrade)
if autoUpgrade {
newState = api.MemberStateUpgrading
}
args := createArangodArgs(apiObject, spec, group, status.Members.Agents, m.ID, autoUpgrade)
env := make(map[string]k8sutil.EnvValue)
livenessProbe, err := r.createLivenessProbe(spec, group)
if err != nil {
return maskAny(err)
}
readinessProbe, err := r.createReadinessProbe(spec, group)
if err != nil {
return maskAny(err)
}
tlsKeyfileSecretName := ""
if spec.IsSecure() {
tlsKeyfileSecretName = k8sutil.CreateTLSKeyfileSecretName(apiObject.GetName(), role, m.ID)
serverNames := []string{
k8sutil.CreateDatabaseClientServiceDNSName(apiObject),
k8sutil.CreatePodDNSName(apiObject, role, m.ID),
}
owner := apiObject.AsOwner()
if err := createServerCertificate(log, kubecli.CoreV1(), serverNames, spec.TLS, tlsKeyfileSecretName, ns, &owner); err != nil && !k8sutil.IsAlreadyExists(err) {
return maskAny(errors.Wrapf(err, "Failed to create TLS keyfile secret"))
}
}
rocksdbEncryptionSecretName := ""
if spec.RocksDB.IsEncrypted() {
rocksdbEncryptionSecretName = spec.RocksDB.Encryption.GetKeySecretName()
if err := k8sutil.ValidateEncryptionKeySecret(kubecli.CoreV1(), rocksdbEncryptionSecretName, ns); err != nil {
return maskAny(errors.Wrapf(err, "RocksDB encryption key secret validation failed"))
}
}
if spec.IsAuthenticated() {
env[constants.EnvArangodJWTSecret] = k8sutil.EnvValue{
SecretName: spec.Authentication.GetJWTSecretName(),
SecretKey: constants.SecretKeyJWT,
}
}
if err := k8sutil.CreateArangodPod(kubecli, spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, m.PersistentVolumeClaimName, info.ImageID, spec.GetImagePullPolicy(), args, env, livenessProbe, readinessProbe, tlsKeyfileSecretName, rocksdbEncryptionSecretName); err != nil {
return maskAny(err)
}
} else if group.IsArangosync() {
// Find image ID
info, found := status.Images.GetByImage(spec.Sync.GetImage())
if !found {
log.Debug().Str("image", spec.Sync.GetImage()).Msg("Image ID is not known yet for image")
return nil
}
// Prepare arguments
args := createArangoSyncArgs(spec, group, groupSpec, status.Members.Agents, m.ID)
env := make(map[string]k8sutil.EnvValue)
livenessProbe, err := r.createLivenessProbe(spec, group)
if err != nil {
return maskAny(err)
}
affinityWithRole := ""
if group == api.ServerGroupSyncWorkers {
affinityWithRole = api.ServerGroupDBServers.AsRole()
}
if err := k8sutil.CreateArangoSyncPod(kubecli, spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, info.ImageID, spec.Sync.GetImagePullPolicy(), args, env, livenessProbe, affinityWithRole); err != nil {
return maskAny(err)
}
}
// Record new member state
m.State = newState
m.Conditions.Remove(api.ConditionTypeReady)
m.Conditions.Remove(api.ConditionTypeTerminated)
m.Conditions.Remove(api.ConditionTypeAutoUpgrade)
if err := memberStatusList.Update(m); err != nil {
return maskAny(err)
}
if err := r.context.UpdateStatus(status); err != nil {
return maskAny(err)
}
// Create event
r.context.CreateEvent(k8sutil.NewMemberAddEvent(m.PodName, role, apiObject))
return nil
}
// EnsurePods creates all Pods listed in member status
func (r *Resources) EnsurePods() error {
iterator := r.context.GetServerGroupIterator()
status := r.context.GetStatus()
if err := iterator.ForeachServerGroup(func(group api.ServerGroup, groupSpec api.ServerGroupSpec, status *api.MemberStatusList) error {
for _, m := range *status {
if m.State != api.MemberStateNone {
continue
}
// Update pod name
role := group.AsRole()
roleAbbr := group.AsRoleAbbreviated()
podSuffix := createPodSuffix(apiObject.Spec)
m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, podSuffix)
newState := api.MemberStateCreated
// Create pod
if group.IsArangod() {
// Find image ID
info, found := apiObject.Status.Images.GetByImage(apiObject.Spec.GetImage())
if !found {
log.Debug().Str("image", apiObject.Spec.GetImage()).Msg("Image ID is not known yet for image")
return nil
}
// Prepare arguments
autoUpgrade := m.Conditions.IsTrue(api.ConditionTypeAutoUpgrade)
if autoUpgrade {
newState = api.MemberStateUpgrading
}
args := createArangodArgs(apiObject, apiObject.Spec, group, d.status.Members.Agents, m.ID, autoUpgrade)
env := make(map[string]k8sutil.EnvValue)
livenessProbe, err := d.createLivenessProbe(apiObject, group)
if err != nil {
return maskAny(err)
}
readinessProbe, err := d.createReadinessProbe(apiObject, group)
if err != nil {
return maskAny(err)
}
tlsKeyfileSecretName := ""
if apiObject.Spec.IsSecure() {
tlsKeyfileSecretName = k8sutil.CreateTLSKeyfileSecretName(apiObject.GetName(), role, m.ID)
serverNames := []string{
k8sutil.CreateDatabaseClientServiceDNSName(apiObject),
k8sutil.CreatePodDNSName(apiObject, role, m.ID),
}
owner := apiObject.AsOwner()
if err := createServerCertificate(log, kubecli.CoreV1(), serverNames, apiObject.Spec.TLS, tlsKeyfileSecretName, ns, &owner); err != nil && !k8sutil.IsAlreadyExists(err) {
return maskAny(errors.Wrapf(err, "Failed to create TLS keyfile secret"))
}
}
rocksdbEncryptionSecretName := ""
if apiObject.Spec.RocksDB.IsEncrypted() {
rocksdbEncryptionSecretName = apiObject.Spec.RocksDB.Encryption.GetKeySecretName()
if err := k8sutil.ValidateEncryptionKeySecret(kubecli.CoreV1(), rocksdbEncryptionSecretName, ns); err != nil {
return maskAny(errors.Wrapf(err, "RocksDB encryption key secret validation failed"))
}
}
if apiObject.Spec.IsAuthenticated() {
env[constants.EnvArangodJWTSecret] = k8sutil.EnvValue{
SecretName: apiObject.Spec.Authentication.GetJWTSecretName(),
SecretKey: constants.SecretKeyJWT,
}
}
if err := k8sutil.CreateArangodPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, m.PersistentVolumeClaimName, info.ImageID, apiObject.Spec.GetImagePullPolicy(), args, env, livenessProbe, readinessProbe, tlsKeyfileSecretName, rocksdbEncryptionSecretName); err != nil {
return maskAny(err)
}
} else if group.IsArangosync() {
// Find image ID
info, found := apiObject.Status.Images.GetByImage(apiObject.Spec.Sync.GetImage())
if !found {
log.Debug().Str("image", apiObject.Spec.Sync.GetImage()).Msg("Image ID is not known yet for image")
return nil
}
// Prepare arguments
args := createArangoSyncArgs(apiObject, group, spec, d.status.Members.Agents, m.ID)
env := make(map[string]k8sutil.EnvValue)
livenessProbe, err := d.createLivenessProbe(apiObject, group)
if err != nil {
return maskAny(err)
}
affinityWithRole := ""
if group == api.ServerGroupSyncWorkers {
affinityWithRole = api.ServerGroupDBServers.AsRole()
}
if err := k8sutil.CreateArangoSyncPod(kubecli, apiObject.Spec.IsDevelopment(), apiObject, role, m.ID, m.PodName, info.ImageID, apiObject.Spec.Sync.GetImagePullPolicy(), args, env, livenessProbe, affinityWithRole); err != nil {
return maskAny(err)
}
}
// Record new member state
m.State = newState
m.Conditions.Remove(api.ConditionTypeReady)
m.Conditions.Remove(api.ConditionTypeTerminated)
m.Conditions.Remove(api.ConditionTypeAutoUpgrade)
if err := status.Update(m); err != nil {
spec := r.context.GetSpec()
if err := r.createPodForMember(spec, group, groupSpec, m, status); err != nil {
return maskAny(err)
}
if err := d.updateCRStatus(); err != nil {
return maskAny(err)
}
// Create event
d.createEvent(k8sutil.NewMemberAddEvent(m.PodName, role, apiObject))
}
return nil
}, &d.status); err != nil {
}, &status); err != nil {
return maskAny(err)
}
return nil

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"testing"

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"testing"

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"testing"

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"testing"

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
@ -34,26 +34,23 @@ var (
inspectedPodCounter = metrics.MustRegisterCounter("deployment", "inspected_pods", "Number of pod inspections")
)
// inspectPods lists all pods that belong to the given deployment and updates
// InspectPods lists all pods that belong to the given deployment and updates
// the member status of the deployment accordingly.
func (d *Deployment) inspectPods() error {
log := d.deps.Log
func (r *Resources) InspectPods() error {
log := r.log
var events []*v1.Event
pods, err := d.GetOwnedPods()
pods, err := r.context.GetOwnedPods()
if err != nil {
log.Debug().Err(err).Msg("Failed to get owned pods")
return maskAny(err)
}
// Update member status from all pods found
status := r.context.GetStatus()
apiObject := r.context.GetAPIObject()
for _, p := range pods {
// Check ownership
if !d.isOwnerOf(&p) {
log.Debug().Str("pod", p.GetName()).Msg("pod not owned by this deployment")
continue
}
if isArangoDBImageIDAndVersionPod(p) {
if k8sutil.IsArangoDBImageIDAndVersionPod(p) {
// Image ID pods are not relevant to inspect here
continue
}
@ -62,7 +59,7 @@ func (d *Deployment) inspectPods() error {
inspectedPodCounter.Inc()
// Find member status
memberStatus, group, found := d.status.Members.MemberStatusByPodName(p.GetName())
memberStatus, group, found := status.Members.MemberStatusByPodName(p.GetName())
if !found {
log.Debug().Str("pod", p.GetName()).Msg("no memberstatus found for pod")
continue
@ -94,7 +91,7 @@ func (d *Deployment) inspectPods() error {
}
if updateMemberStatusNeeded {
log.Debug().Str("pod-name", p.GetName()).Msg("Updated member status member for pod")
if err := d.status.Members.UpdateMemberStatus(memberStatus, group); err != nil {
if err := status.Members.UpdateMemberStatus(memberStatus, group); err != nil {
return maskAny(err)
}
}
@ -102,7 +99,7 @@ func (d *Deployment) inspectPods() error {
podExists := func(podName string) bool {
for _, p := range pods {
if p.GetName() == podName && d.isOwnerOf(&p) {
if p.GetName() == podName {
return true
}
}
@ -110,7 +107,7 @@ func (d *Deployment) inspectPods() error {
}
// Go over all members, check for missing pods
d.status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error {
status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error {
for _, m := range *members {
if podName := m.PodName; podName != "" {
if !podExists(podName) {
@ -121,16 +118,16 @@ func (d *Deployment) inspectPods() error {
// Shutdown was intended, so not need to do anything here.
// Just mark terminated
if m.Conditions.Update(api.ConditionTypeTerminated, true, "Pod Terminated", "") {
if err := d.status.Members.UpdateMemberStatus(m, group); err != nil {
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
return maskAny(err)
}
}
default:
m.State = api.MemberStateNone // This is trigger a recreate of the pod.
// Create event
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), d.apiObject))
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), apiObject))
if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") {
if err := d.status.Members.UpdateMemberStatus(m, group); err != nil {
if err := status.Members.UpdateMemberStatus(m, group); err != nil {
return maskAny(err)
}
}
@ -142,22 +139,22 @@ func (d *Deployment) inspectPods() error {
})
// Check overall status update
switch d.status.State {
switch status.State {
case api.DeploymentStateCreating:
if d.status.Members.AllMembersReady() {
d.status.State = api.DeploymentStateRunning
if status.Members.AllMembersReady() {
status.State = api.DeploymentStateRunning
}
// TODO handle other State values
}
// Save status
if err := d.updateCRStatus(); err != nil {
if err := r.context.UpdateStatus(status); err != nil {
return maskAny(err)
}
// Create events
for _, evt := range events {
d.createEvent(evt)
r.context.CreateEvent(evt)
}
return nil
}

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
@ -28,14 +28,17 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// ensurePVCs creates all PVC's listed in member status
func (d *Deployment) ensurePVCs(apiObject *api.ArangoDeployment) error {
kubecli := d.deps.KubeCli
// EnsurePVCs creates all PVC's listed in member status
func (r *Resources) EnsurePVCs() error {
kubecli := r.context.GetKubeCli()
apiObject := r.context.GetAPIObject()
deploymentName := apiObject.GetName()
ns := apiObject.GetNamespace()
owner := apiObject.AsOwner()
iterator := r.context.GetServerGroupIterator()
status := r.context.GetStatus()
if err := apiObject.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error {
if err := iterator.ForeachServerGroup(func(group api.ServerGroup, spec api.ServerGroupSpec, status *api.MemberStatusList) error {
for _, m := range *status {
if m.PersistentVolumeClaimName != "" {
storageClassName := spec.GetStorageClassName()
@ -47,7 +50,7 @@ func (d *Deployment) ensurePVCs(apiObject *api.ArangoDeployment) error {
}
}
return nil
}, &d.status); err != nil {
}, &status); err != nil {
return maskAny(err)
}
return nil

View file

@ -0,0 +1,41 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package resources
import "github.com/rs/zerolog"
// Resources is a service that creates low level resources for members
// and inspects low level resources, put the inspection result in members.
type Resources struct {
log zerolog.Logger
context Context
}
// NewResources creates a new Resources service, used to
// create and inspect low level resources such as pods and services.
func NewResources(log zerolog.Logger, context Context) *Resources {
return &Resources{
log: log,
context: context,
}
}

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"crypto/rand"
@ -33,20 +33,21 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// createSecrets creates all secrets needed to run the given deployment
func (d *Deployment) createSecrets(apiObject *api.ArangoDeployment) error {
if apiObject.Spec.IsAuthenticated() {
if err := d.ensureJWTSecret(apiObject.Spec.Authentication.GetJWTSecretName()); err != nil {
// EnsureSecrets creates all secrets needed to run the given deployment
func (r *Resources) EnsureSecrets() error {
spec := r.context.GetSpec()
if spec.IsAuthenticated() {
if err := r.ensureJWTSecret(spec.Authentication.GetJWTSecretName()); err != nil {
return maskAny(err)
}
}
if apiObject.Spec.IsSecure() {
if err := d.ensureCACertificateSecret(apiObject.Spec.TLS); err != nil {
if spec.IsSecure() {
if err := r.ensureCACertificateSecret(spec.TLS); err != nil {
return maskAny(err)
}
}
if apiObject.Spec.Sync.IsEnabled() {
if err := d.ensureCACertificateSecret(apiObject.Spec.Sync.TLS); err != nil {
if spec.Sync.IsEnabled() {
if err := r.ensureCACertificateSecret(spec.Sync.TLS); err != nil {
return maskAny(err)
}
}
@ -56,9 +57,9 @@ func (d *Deployment) createSecrets(apiObject *api.ArangoDeployment) error {
// ensureJWTSecret checks if a secret with given name exists in the namespace
// of the deployment. If not, it will add such a secret with a random
// JWT token.
func (d *Deployment) ensureJWTSecret(secretName string) error {
kubecli := d.deps.KubeCli
ns := d.apiObject.GetNamespace()
func (r *Resources) ensureJWTSecret(secretName string) error {
kubecli := r.context.GetKubeCli()
ns := r.context.GetNamespace()
if _, err := kubecli.CoreV1().Secrets(ns).Get(secretName, metav1.GetOptions{}); k8sutil.IsNotFound(err) {
// Secret not found, create it
// Create token
@ -67,7 +68,7 @@ func (d *Deployment) ensureJWTSecret(secretName string) error {
token := hex.EncodeToString(tokenData)
// Create secret
owner := d.apiObject.AsOwner()
owner := r.context.GetAPIObject().AsOwner()
if err := k8sutil.CreateJWTSecret(kubecli.CoreV1(), secretName, ns, token, &owner); k8sutil.IsAlreadyExists(err) {
// Secret added while we tried it also
return nil
@ -85,14 +86,15 @@ func (d *Deployment) ensureJWTSecret(secretName string) error {
// ensureCACertificateSecret checks if a secret with given name exists in the namespace
// of the deployment. If not, it will add such a secret with a generated CA certificate.
// JWT token.
func (d *Deployment) ensureCACertificateSecret(spec api.TLSSpec) error {
kubecli := d.deps.KubeCli
ns := d.apiObject.GetNamespace()
func (r *Resources) ensureCACertificateSecret(spec api.TLSSpec) error {
kubecli := r.context.GetKubeCli()
ns := r.context.GetNamespace()
if _, err := kubecli.CoreV1().Secrets(ns).Get(spec.GetCASecretName(), metav1.GetOptions{}); k8sutil.IsNotFound(err) {
// Secret not found, create it
owner := d.apiObject.AsOwner()
deploymentName := d.apiObject.GetName()
if err := createCACertificate(d.deps.Log, kubecli.CoreV1(), spec, deploymentName, ns, &owner); k8sutil.IsAlreadyExists(err) {
apiObject := r.context.GetAPIObject()
owner := apiObject.AsOwner()
deploymentName := apiObject.GetName()
if err := createCACertificate(r.log, kubecli.CoreV1(), spec, deploymentName, ns, &owner); k8sutil.IsAlreadyExists(err) {
// Secret added while we tried it also
return nil
} else if err != nil {
@ -107,39 +109,42 @@ func (d *Deployment) ensureCACertificateSecret(spec api.TLSSpec) error {
}
// getJWTSecret loads the JWT secret from a Secret configured in apiObject.Spec.Authentication.JWTSecretName.
func (d *Deployment) getJWTSecret(apiObject *api.ArangoDeployment) (string, error) {
if !apiObject.Spec.IsAuthenticated() {
func (r *Resources) getJWTSecret(spec api.DeploymentSpec) (string, error) {
if !spec.IsAuthenticated() {
return "", nil
}
kubecli := d.deps.KubeCli
secretName := apiObject.Spec.Authentication.GetJWTSecretName()
s, err := k8sutil.GetJWTSecret(kubecli.CoreV1(), secretName, apiObject.GetNamespace())
kubecli := r.context.GetKubeCli()
ns := r.context.GetNamespace()
secretName := spec.Authentication.GetJWTSecretName()
s, err := k8sutil.GetJWTSecret(kubecli.CoreV1(), secretName, ns)
if err != nil {
d.deps.Log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get JWT secret")
r.log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get JWT secret")
return "", maskAny(err)
}
return s, nil
}
// getSyncJWTSecret loads the JWT secret used for syncmasters from a Secret configured in apiObject.Spec.Sync.Authentication.JWTSecretName.
func (d *Deployment) getSyncJWTSecret(apiObject *api.ArangoDeployment) (string, error) {
kubecli := d.deps.KubeCli
secretName := apiObject.Spec.Sync.Authentication.GetJWTSecretName()
s, err := k8sutil.GetJWTSecret(kubecli.CoreV1(), secretName, apiObject.GetNamespace())
func (r *Resources) getSyncJWTSecret(spec api.DeploymentSpec) (string, error) {
kubecli := r.context.GetKubeCli()
ns := r.context.GetNamespace()
secretName := spec.Sync.Authentication.GetJWTSecretName()
s, err := k8sutil.GetJWTSecret(kubecli.CoreV1(), secretName, ns)
if err != nil {
d.deps.Log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync JWT secret")
r.log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get sync JWT secret")
return "", maskAny(err)
}
return s, nil
}
// getSyncMonitoringToken loads the token secret used for monitoring sync masters & workers.
func (d *Deployment) getSyncMonitoringToken(apiObject *api.ArangoDeployment) (string, error) {
kubecli := d.deps.KubeCli
secretName := apiObject.Spec.Sync.Monitoring.GetTokenSecretName()
s, err := kubecli.CoreV1().Secrets(apiObject.GetNamespace()).Get(secretName, metav1.GetOptions{})
func (r *Resources) getSyncMonitoringToken(spec api.DeploymentSpec) (string, error) {
kubecli := r.context.GetKubeCli()
ns := r.context.GetNamespace()
secretName := spec.Sync.Monitoring.GetTokenSecretName()
s, err := kubecli.CoreV1().Secrets(ns).Get(secretName, metav1.GetOptions{})
if err != nil {
d.deps.Log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get monitoring token secret")
r.log.Debug().Err(err).Str("secret-name", secretName).Msg("Failed to get monitoring token secret")
}
// Take the first data
for _, v := range s.Data {

View file

@ -20,18 +20,19 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// createServices creates all services needed to service the given deployment
func (d *Deployment) createServices(apiObject *api.ArangoDeployment) error {
log := d.deps.Log
kubecli := d.deps.KubeCli
// EnsureServices creates all services needed to service the deployment
func (r *Resources) EnsureServices() error {
log := r.log
kubecli := r.context.GetKubeCli()
apiObject := r.context.GetAPIObject()
owner := apiObject.AsOwner()
spec := r.context.GetSpec()
log.Debug().Msg("creating services...")
@ -39,23 +40,30 @@ func (d *Deployment) createServices(apiObject *api.ArangoDeployment) error {
log.Debug().Err(err).Msg("Failed to create headless service")
return maskAny(err)
}
single := apiObject.Spec.GetMode().HasSingleServers()
if svcName, err := k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner); err != nil {
single := spec.GetMode().HasSingleServers()
svcName, err := k8sutil.CreateDatabaseClientService(kubecli, apiObject, single, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client service")
return maskAny(err)
} else {
d.status.ServiceName = svcName
if err := d.updateCRStatus(); err != nil {
}
status := r.context.GetStatus()
if status.ServiceName != svcName {
status.ServiceName = svcName
if err := r.context.UpdateStatus(status); err != nil {
return maskAny(err)
}
}
if apiObject.Spec.Sync.IsEnabled() {
if svcName, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner); err != nil {
if spec.Sync.IsEnabled() {
svcName, err := k8sutil.CreateSyncMasterClientService(kubecli, apiObject, owner)
if err != nil {
log.Debug().Err(err).Msg("Failed to create syncmaster client service")
return maskAny(err)
} else {
d.status.ServiceName = svcName
if err := d.updateCRStatus(); err != nil {
}
status := r.context.GetStatus()
if status.SyncServiceName != svcName {
status.SyncServiceName = svcName
if err := r.context.UpdateStatus(status); err != nil {
return maskAny(err)
}
}

View file

@ -20,7 +20,7 @@
// Author Ewout Prangsma
//
package deployment
package resources
import (
"fmt"

View file

@ -30,4 +30,7 @@ const (
ClusterIPNone = "None"
TolerateUnreadyEndpointsAnnotation = "service.alpha.kubernetes.io/tolerate-unready-endpoints"
TopologyKeyHostname = "kubernetes.io/hostname"
// Internal constants
ImageIDAndVersionRole = "id" // Role use by identification pods
)

View file

@ -87,6 +87,12 @@ func IsPodFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
}
// IsArangoDBImageIDAndVersionPod returns true if the given pod is used for fetching image ID and ArangoDB version of an image
func IsArangoDBImageIDAndVersionPod(p v1.Pod) bool {
role, found := p.GetLabels()[LabelKeyRole]
return found && role == ImageIDAndVersionRole
}
// getPodCondition returns the condition of given type in the given status.
// If not found, nil is returned.
func getPodCondition(status *v1.PodStatus, condType v1.PodConditionType) *v1.PodCondition {