1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] ArangoMember template propagation (#773)

This commit is contained in:
Adam Janikowski 2021-08-26 09:59:16 +02:00 committed by GitHub
parent d9aa13cb6c
commit f62fdf2faa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
43 changed files with 1530 additions and 498 deletions

View file

@ -6,6 +6,7 @@
- Add Pending Member phase
- Add Ephemeral Volumes for apps feature
- Check if the DB server is cleaned out.
- Render Pod Template in ArangoMember Spec and Status
## [1.2.1](https://github.com/arangodb/kube-arangodb/tree/1.2.1) (2021-07-28)
- Fix ArangoMember race with multiple ArangoDeployments within single namespace

176
main.go
View file

@ -28,11 +28,15 @@ import (
goflag "flag"
"fmt"
"net"
"net/http"
"os"
"strconv"
"strings"
"time"
operatorHTTP "github.com/arangodb/kube-arangodb/pkg/util/http"
"github.com/gin-gonic/gin"
"github.com/arangodb/kube-arangodb/pkg/version"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
@ -110,6 +114,7 @@ var (
enableDeploymentReplication bool // Run deployment-replication operator
enableStorage bool // Run local-storage operator
enableBackup bool // Run backup operator
versionOnly bool // Run only version endpoint, explicitly disabled with other
alpineImage, metricsExporterImage, arangoImage string
@ -143,6 +148,7 @@ func init() {
f.BoolVar(&operatorOptions.enableDeploymentReplication, "operator.deployment-replication", false, "Enable to run the ArangoDeploymentReplication operator")
f.BoolVar(&operatorOptions.enableStorage, "operator.storage", false, "Enable to run the ArangoLocalStorage operator")
f.BoolVar(&operatorOptions.enableBackup, "operator.backup", false, "Enable to run the ArangoBackup operator")
f.BoolVar(&operatorOptions.versionOnly, "operator.version", false, "Enable only version endpoint in Operator")
f.StringVar(&operatorOptions.alpineImage, "operator.alpine-image", UBIImageEnv.GetOrDefault(defaultAlpineImage), "Docker image used for alpine containers")
f.MarkDeprecated("operator.alpine-image", "Value is not used anymore")
f.StringVar(&operatorOptions.metricsExporterImage, "operator.metrics-exporter-image", MetricsExporterImageEnv.GetOrDefault(defaultMetricsExporterImage), "Docker image used for metrics containers by default")
@ -198,7 +204,11 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
// Check operating mode
if !operatorOptions.enableDeployment && !operatorOptions.enableDeploymentReplication && !operatorOptions.enableStorage && !operatorOptions.enableBackup {
cliLog.Fatal().Err(err).Msg("Turn on --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup or any combination of these")
if !operatorOptions.versionOnly {
cliLog.Fatal().Err(err).Msg("Turn on --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup or any combination of these")
}
} else if operatorOptions.versionOnly {
cliLog.Fatal().Err(err).Msg("Options --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup cannot be enabled together with --operator.version")
}
// Log version
@ -208,81 +218,111 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
Msgf("Starting arangodb-operator (%s), version %s build %s", version.GetVersionV1().Edition.Title(), version.GetVersionV1().Version, version.GetVersionV1().Build)
// Check environment
if len(namespace) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodNamespace)
}
if len(name) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodName)
}
if len(ip) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodIP)
}
if !operatorOptions.versionOnly {
if len(namespace) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodNamespace)
}
if len(name) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodName)
}
if len(ip) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodIP)
}
// Get host name
id, err := os.Hostname()
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to get hostname")
}
// Get host name
id, err := os.Hostname()
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to get hostname")
}
// Create kubernetes client
kubecli, err := k8sutil.NewKubeClient()
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create Kubernetes client")
}
secrets := kubecli.CoreV1().Secrets(namespace)
// Create kubernetes client
kubecli, err := k8sutil.NewKubeClient()
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create Kubernetes client")
}
secrets := kubecli.CoreV1().Secrets(namespace)
// Create operator
cfg, deps, err := newOperatorConfigAndDeps(id+"-"+name, namespace, name)
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create operator config & deps")
}
o, err := operator.NewOperator(cfg, deps)
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create operator")
}
// Create operator
cfg, deps, err := newOperatorConfigAndDeps(id+"-"+name, namespace, name)
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create operator config & deps")
}
o, err := operator.NewOperator(cfg, deps)
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create operator")
}
listenAddr := net.JoinHostPort(serverOptions.host, strconv.Itoa(serverOptions.port))
if svr, err := server.NewServer(kubecli.CoreV1(), server.Config{
Namespace: namespace,
Address: listenAddr,
TLSSecretName: serverOptions.tlsSecretName,
TLSSecretNamespace: namespace,
PodName: name,
PodIP: ip,
AdminSecretName: serverOptions.adminSecretName,
AllowAnonymous: serverOptions.allowAnonymous,
}, server.Dependencies{
Log: logService.MustGetLogger(logging.LoggerNameServer),
LivenessProbe: &livenessProbe,
Deployment: server.OperatorDependency{
Enabled: cfg.EnableDeployment,
Probe: &deploymentProbe,
},
DeploymentReplication: server.OperatorDependency{
Enabled: cfg.EnableDeploymentReplication,
Probe: &deploymentReplicationProbe,
},
Storage: server.OperatorDependency{
Enabled: cfg.EnableStorage,
Probe: &storageProbe,
},
Backup: server.OperatorDependency{
Enabled: cfg.EnableBackup,
Probe: &backupProbe,
},
Operators: o,
listenAddr := net.JoinHostPort(serverOptions.host, strconv.Itoa(serverOptions.port))
if svr, err := server.NewServer(kubecli.CoreV1(), server.Config{
Namespace: namespace,
Address: listenAddr,
TLSSecretName: serverOptions.tlsSecretName,
TLSSecretNamespace: namespace,
PodName: name,
PodIP: ip,
AdminSecretName: serverOptions.adminSecretName,
AllowAnonymous: serverOptions.allowAnonymous,
}, server.Dependencies{
Log: logService.MustGetLogger(logging.LoggerNameServer),
LivenessProbe: &livenessProbe,
Deployment: server.OperatorDependency{
Enabled: cfg.EnableDeployment,
Probe: &deploymentProbe,
},
DeploymentReplication: server.OperatorDependency{
Enabled: cfg.EnableDeploymentReplication,
Probe: &deploymentReplicationProbe,
},
Storage: server.OperatorDependency{
Enabled: cfg.EnableStorage,
Probe: &storageProbe,
},
Backup: server.OperatorDependency{
Enabled: cfg.EnableBackup,
Probe: &backupProbe,
},
Operators: o,
Secrets: secrets,
}); err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create HTTP server")
Secrets: secrets,
}); err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create HTTP server")
} else {
go utilsError.LogError(cliLog, "error while starting service", svr.Run)
}
// startChaos(context.Background(), cfg.KubeCli, cfg.Namespace, chaosLevel)
// Start operator
o.Run()
} else {
go utilsError.LogError(cliLog, "error while starting service", svr.Run)
if err := startVersionProcess(); err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create HTTP server")
}
}
}
func startVersionProcess() error {
// Just expose version
listenAddr := net.JoinHostPort(serverOptions.host, strconv.Itoa(serverOptions.port))
cliLog.Info().Str("addr", listenAddr).Msgf("Starting version endpoint")
gin.SetMode(gin.ReleaseMode)
r := gin.New()
r.Use(gin.Recovery())
versionV1Responser, err := operatorHTTP.NewSimpleJSONResponse(version.GetVersionV1())
if err != nil {
return errors.WithStack(err)
}
r.GET("/_api/version", gin.WrapF(versionV1Responser.ServeHTTP))
r.GET("/api/v1/version", gin.WrapF(versionV1Responser.ServeHTTP))
s := http.Server{
Addr: listenAddr,
Handler: r,
}
// startChaos(context.Background(), cfg.KubeCli, cfg.Namespace, chaosLevel)
// Start operator
o.Run()
return s.ListenAndServe()
}
// newOperatorConfigAndDeps creates operator config & dependencies.

View file

@ -0,0 +1,81 @@
//
// DISCLAIMER
//
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package v1
import (
"crypto/sha256"
"fmt"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/json"
)
func GetArangoMemberPodTemplate(pod *core.PodTemplateSpec, podSpecChecksum string) (*ArangoMemberPodTemplate, error) {
data, err := json.Marshal(pod.Spec)
if err != nil {
return nil, err
}
return &ArangoMemberPodTemplate{
PodSpec: pod,
PodSpecChecksum: podSpecChecksum,
Checksum: fmt.Sprintf("%0x", sha256.Sum256(data)),
}, nil
}
type ArangoMemberPodTemplate struct {
PodSpec *core.PodTemplateSpec `json:"podSpec,omitempty"`
PodSpecChecksum string `json:"podSpecChecksum,omitempty"`
Checksum string `json:"checksum,omitempty"`
}
func (a *ArangoMemberPodTemplate) Equals(b *ArangoMemberPodTemplate) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.Checksum == b.Checksum && a.PodSpecChecksum == b.PodSpecChecksum
}
func (a *ArangoMemberPodTemplate) RotationNeeded(b *ArangoMemberPodTemplate) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return true
}
return a.PodSpecChecksum != b.PodSpecChecksum
}
func (a *ArangoMemberPodTemplate) EqualPodSpecChecksum(checksum string) bool {
if a == nil {
return false
}
return checksum == a.PodSpecChecksum
}

View file

@ -23,7 +23,6 @@
package v1
import (
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
@ -32,6 +31,5 @@ type ArangoMemberSpec struct {
ID string `json:"id,omitempty"`
DeploymentUID types.UID `json:"deploymentUID,omitempty"`
Template *core.PodTemplate `json:"template,omitempty"`
TemplateChecksum string `json:"templateChecksum,omitempty"`
Template *ArangoMemberPodTemplate `json:"template,omitempty"`
}

View file

@ -22,5 +22,16 @@
package v1
const (
ArangoMemberConditionPendingRestart ConditionType = "pending-restart"
)
type ArangoMemberStatus struct {
Conditions ConditionList `json:"conditions,omitempty"`
Template *ArangoMemberPodTemplate `json:"template,omitempty"`
}
func (a ArangoMemberStatus) IsPendingRestart() bool {
return a.Conditions.IsTrue(ArangoMemberConditionPendingRestart)
}

View file

@ -31,6 +31,10 @@ import (
// ConditionType is a strongly typed condition name
type ConditionType string
func (c ConditionType) String() string {
return string(c)
}
const (
// ConditionTypeReady indicates that the member or entire deployment is ready and running normally.
ConditionTypeReady ConditionType = "Ready"
@ -67,6 +71,10 @@ const (
ConditionTypeUpgradeFailed ConditionType = "UpgradeFailed"
// ConditionTypeMaintenanceMode indicates that Maintenance is enabled
ConditionTypeMaintenanceMode ConditionType = "MaintenanceMode"
// ConditionTypePendingRestart indicates that restart is required
ConditionTypePendingRestart ConditionType = "PendingRestart"
// ConditionTypePendingTLSRotation indicates that TLS rotation is pending
ConditionTypePendingTLSRotation ConditionType = "PendingTLSRotation"
)
// Condition represents one current condition of a deployment or deployment member.

View file

@ -102,3 +102,7 @@ func (ds *DeploymentStatus) Equal(other DeploymentStatus) bool {
func (ds *DeploymentStatus) IsForceReload() bool {
return util.BoolOrDefault(ds.ForceStatusReload, false)
}
func (ds *DeploymentStatus) IsPlanEmpty() bool {
return ds.Plan.IsEmpty() && ds.HighPriorityPlan.IsEmpty()
}

View file

@ -161,6 +161,10 @@ const (
ActionTypeSetMemberCondition ActionType = "SetMemberCondition"
// ActionTypeMemberRIDUpdate updated member Run ID (UID). High priority
ActionTypeMemberRIDUpdate ActionType = "MemberRIDUpdate"
// ActionTypeArangoMemberUpdatePodSpec updates pod spec
ActionTypeArangoMemberUpdatePodSpec ActionType = "ArangoMemberUpdatePodSpec"
// ActionTypeArangoMemberUpdatePodStatus updates pod spec
ActionTypeArangoMemberUpdatePodStatus ActionType = "ArangoMemberUpdatePodStatus"
)
const (

View file

@ -149,7 +149,7 @@ func (in *ArangoMember) DeepCopyInto(out *ArangoMember) {
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
out.Status = in.Status
in.Status.DeepCopyInto(&out.Status)
return
}
@ -204,12 +204,33 @@ func (in *ArangoMemberList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMemberPodTemplate) DeepCopyInto(out *ArangoMemberPodTemplate) {
*out = *in
if in.PodSpec != nil {
in, out := &in.PodSpec, &out.PodSpec
*out = new(corev1.PodTemplateSpec)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoMemberPodTemplate.
func (in *ArangoMemberPodTemplate) DeepCopy() *ArangoMemberPodTemplate {
if in == nil {
return nil
}
out := new(ArangoMemberPodTemplate)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMemberSpec) DeepCopyInto(out *ArangoMemberSpec) {
*out = *in
if in.Template != nil {
in, out := &in.Template, &out.Template
*out = new(corev1.PodTemplate)
*out = new(ArangoMemberPodTemplate)
(*in).DeepCopyInto(*out)
}
return
@ -228,6 +249,18 @@ func (in *ArangoMemberSpec) DeepCopy() *ArangoMemberSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMemberStatus) DeepCopyInto(out *ArangoMemberStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make(ConditionList, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Template != nil {
in, out := &in.Template, &out.Template
*out = new(ArangoMemberPodTemplate)
(*in).DeepCopyInto(*out)
}
return
}

View file

@ -0,0 +1,81 @@
//
// DISCLAIMER
//
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package v2alpha1
import (
"crypto/sha256"
"fmt"
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/json"
)
func GetArangoMemberPodTemplate(pod *core.PodTemplateSpec, podSpecChecksum string) (*ArangoMemberPodTemplate, error) {
data, err := json.Marshal(pod.Spec)
if err != nil {
return nil, err
}
return &ArangoMemberPodTemplate{
PodSpec: pod,
PodSpecChecksum: podSpecChecksum,
Checksum: fmt.Sprintf("%0x", sha256.Sum256(data)),
}, nil
}
type ArangoMemberPodTemplate struct {
PodSpec *core.PodTemplateSpec `json:"podSpec,omitempty"`
PodSpecChecksum string `json:"podSpecChecksum,omitempty"`
Checksum string `json:"checksum,omitempty"`
}
func (a *ArangoMemberPodTemplate) Equals(b *ArangoMemberPodTemplate) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return false
}
return a.Checksum == b.Checksum && a.PodSpecChecksum == b.PodSpecChecksum
}
func (a *ArangoMemberPodTemplate) RotationNeeded(b *ArangoMemberPodTemplate) bool {
if a == nil && b == nil {
return true
}
if a == nil || b == nil {
return true
}
return a.PodSpecChecksum != b.PodSpecChecksum
}
func (a *ArangoMemberPodTemplate) EqualPodSpecChecksum(checksum string) bool {
if a == nil {
return false
}
return checksum == a.PodSpecChecksum
}

View file

@ -23,7 +23,6 @@
package v2alpha1
import (
core "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
)
@ -32,6 +31,5 @@ type ArangoMemberSpec struct {
ID string `json:"id,omitempty"`
DeploymentUID types.UID `json:"deploymentUID,omitempty"`
Template *core.PodTemplate `json:"template,omitempty"`
TemplateChecksum string `json:"templateChecksum,omitempty"`
Template *ArangoMemberPodTemplate `json:"template,omitempty"`
}

View file

@ -22,5 +22,16 @@
package v2alpha1
const (
ArangoMemberConditionPendingRestart ConditionType = "pending-restart"
)
type ArangoMemberStatus struct {
Conditions ConditionList `json:"conditions,omitempty"`
Template *ArangoMemberPodTemplate `json:"template,omitempty"`
}
func (a ArangoMemberStatus) IsPendingRestart() bool {
return a.Conditions.IsTrue(ArangoMemberConditionPendingRestart)
}

View file

@ -31,6 +31,10 @@ import (
// ConditionType is a strongly typed condition name
type ConditionType string
func (c ConditionType) String() string {
return string(c)
}
const (
// ConditionTypeReady indicates that the member or entire deployment is ready and running normally.
ConditionTypeReady ConditionType = "Ready"
@ -67,6 +71,10 @@ const (
ConditionTypeUpgradeFailed ConditionType = "UpgradeFailed"
// ConditionTypeMaintenanceMode indicates that Maintenance is enabled
ConditionTypeMaintenanceMode ConditionType = "MaintenanceMode"
// ConditionTypePendingRestart indicates that restart is required
ConditionTypePendingRestart ConditionType = "PendingRestart"
// ConditionTypePendingTLSRotation indicates that TLS rotation is pending
ConditionTypePendingTLSRotation ConditionType = "PendingTLSRotation"
)
// Condition represents one current condition of a deployment or deployment member.

View file

@ -102,3 +102,7 @@ func (ds *DeploymentStatus) Equal(other DeploymentStatus) bool {
func (ds *DeploymentStatus) IsForceReload() bool {
return util.BoolOrDefault(ds.ForceStatusReload, false)
}
func (ds *DeploymentStatus) IsPlanEmpty() bool {
return ds.Plan.IsEmpty() && ds.HighPriorityPlan.IsEmpty()
}

View file

@ -161,6 +161,10 @@ const (
ActionTypeSetMemberCondition ActionType = "SetMemberCondition"
// ActionTypeMemberRIDUpdate updated member Run ID (UID). High priority
ActionTypeMemberRIDUpdate ActionType = "MemberRIDUpdate"
// ActionTypeArangoMemberUpdatePodSpec updates pod spec
ActionTypeArangoMemberUpdatePodSpec ActionType = "ArangoMemberUpdatePodSpec"
// ActionTypeArangoMemberUpdatePodStatus updates pod spec
ActionTypeArangoMemberUpdatePodStatus ActionType = "ArangoMemberUpdatePodStatus"
)
const (

View file

@ -149,7 +149,7 @@ func (in *ArangoMember) DeepCopyInto(out *ArangoMember) {
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.Spec.DeepCopyInto(&out.Spec)
out.Status = in.Status
in.Status.DeepCopyInto(&out.Status)
return
}
@ -204,12 +204,33 @@ func (in *ArangoMemberList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMemberPodTemplate) DeepCopyInto(out *ArangoMemberPodTemplate) {
*out = *in
if in.PodSpec != nil {
in, out := &in.PodSpec, &out.PodSpec
*out = new(v1.PodTemplateSpec)
(*in).DeepCopyInto(*out)
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ArangoMemberPodTemplate.
func (in *ArangoMemberPodTemplate) DeepCopy() *ArangoMemberPodTemplate {
if in == nil {
return nil
}
out := new(ArangoMemberPodTemplate)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMemberSpec) DeepCopyInto(out *ArangoMemberSpec) {
*out = *in
if in.Template != nil {
in, out := &in.Template, &out.Template
*out = new(v1.PodTemplate)
*out = new(ArangoMemberPodTemplate)
(*in).DeepCopyInto(*out)
}
return
@ -228,6 +249,18 @@ func (in *ArangoMemberSpec) DeepCopy() *ArangoMemberSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMemberStatus) DeepCopyInto(out *ArangoMemberStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make(ConditionList, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
if in.Template != nil {
in, out := &in.Template, &out.Template
*out = new(ArangoMemberPodTemplate)
(*in).DeepCopyInto(*out)
}
return
}

View file

@ -64,7 +64,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
v1 "k8s.io/api/core/v1"
core "k8s.io/api/core/v1"
)
var _ resources.Context = &Deployment{}
@ -381,7 +381,7 @@ func (d *Deployment) CreateMember(ctx context.Context, group api.ServerGroup, id
}
// GetPod returns pod.
func (d *Deployment) GetPod(ctx context.Context, podName string) (*v1.Pod, error) {
func (d *Deployment) GetPod(ctx context.Context, podName string) (*core.Pod, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
@ -405,7 +405,7 @@ func (d *Deployment) DeletePod(ctx context.Context, podName string) error {
// CleanupPod deletes a given pod with force and explicit UID.
// If the pod does not exist, the error is ignored.
func (d *Deployment) CleanupPod(ctx context.Context, p *v1.Pod) error {
func (d *Deployment) CleanupPod(ctx context.Context, p *core.Pod) error {
log := d.deps.Log
podName := p.GetName()
ns := p.GetNamespace()
@ -462,7 +462,7 @@ func (d *Deployment) DeletePvc(ctx context.Context, pvcName string) error {
// UpdatePvc updated a persistent volume claim in the namespace
// of the deployment. If the pvc does not exist, the error is ignored.
func (d *Deployment) UpdatePvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
func (d *Deployment) UpdatePvc(ctx context.Context, pvc *core.PersistentVolumeClaim) error {
err := k8sutil.RunWithTimeout(ctx, func(ctxChild context.Context) error {
_, err := d.GetKubeCli().CoreV1().PersistentVolumeClaims(d.GetNamespace()).Update(ctxChild, pvc, meta.UpdateOptions{})
return err
@ -479,7 +479,7 @@ func (d *Deployment) UpdatePvc(ctx context.Context, pvc *v1.PersistentVolumeClai
}
// GetOwnedPVCs returns a list of all PVCs owned by the deployment.
func (d *Deployment) GetOwnedPVCs() ([]v1.PersistentVolumeClaim, error) {
func (d *Deployment) GetOwnedPVCs() ([]core.PersistentVolumeClaim, error) {
// Get all current PVCs
log := d.deps.Log
pvcs, err := d.deps.KubeCli.CoreV1().PersistentVolumeClaims(d.GetNamespace()).List(context.Background(), k8sutil.DeploymentListOpt(d.GetName()))
@ -487,7 +487,7 @@ func (d *Deployment) GetOwnedPVCs() ([]v1.PersistentVolumeClaim, error) {
log.Debug().Err(err).Msg("Failed to list PVCs")
return nil, errors.WithStack(err)
}
myPVCs := make([]v1.PersistentVolumeClaim, 0, len(pvcs.Items))
myPVCs := make([]core.PersistentVolumeClaim, 0, len(pvcs.Items))
for _, p := range pvcs.Items {
if d.isOwnerOf(&p) {
myPVCs = append(myPVCs, p)
@ -497,7 +497,7 @@ func (d *Deployment) GetOwnedPVCs() ([]v1.PersistentVolumeClaim, error) {
}
// GetPvc gets a PVC by the given name, in the samespace of the deployment.
func (d *Deployment) GetPvc(ctx context.Context, pvcName string) (*v1.PersistentVolumeClaim, error) {
func (d *Deployment) GetPvc(ctx context.Context, pvcName string) (*core.PersistentVolumeClaim, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
@ -577,14 +577,30 @@ func (d *Deployment) GetAgencyData(ctx context.Context, i interface{}, keyParts
return err
}
func (d *Deployment) RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*v1.Pod, error) {
func (d *Deployment) RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
return d.resources.RenderPodForMember(ctx, cachedStatus, spec, status, memberID, imageInfo)
}
func (d *Deployment) RenderPodForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.Pod, error) {
return d.resources.RenderPodForMemberFromCurrent(ctx, cachedStatus, memberID)
}
func (d *Deployment) RenderPodTemplateForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.PodTemplateSpec, error) {
return d.resources.RenderPodTemplateForMember(ctx, cachedStatus, spec, status, memberID, imageInfo)
}
func (d *Deployment) RenderPodTemplateForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.PodTemplateSpec, error) {
return d.resources.RenderPodTemplateForMemberFromCurrent(ctx, cachedStatus, memberID)
}
func (d *Deployment) SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool) {
return d.resources.SelectImage(spec, status)
}
func (d *Deployment) SelectImageForMember(spec api.DeploymentSpec, status api.DeploymentStatus, member api.MemberStatus) (api.ImageInfo, bool) {
return d.resources.SelectImageForMember(spec, status, member)
}
func (d *Deployment) GetMetricsExporterImage() string {
return d.config.MetricsExporterImage
}
@ -616,7 +632,7 @@ func (d *Deployment) GetName() string {
return d.name
}
func (d *Deployment) GetOwnedPods(ctx context.Context) ([]v1.Pod, error) {
func (d *Deployment) GetOwnedPods(ctx context.Context) ([]core.Pod, error) {
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
@ -625,7 +641,7 @@ func (d *Deployment) GetOwnedPods(ctx context.Context) ([]v1.Pod, error) {
return nil, err
}
podList := make([]v1.Pod, 0, len(pods.Items))
podList := make([]core.Pod, 0, len(pods.Items))
for _, p := range pods.Items {
if !d.isOwnerOf(&p) {
@ -645,3 +661,36 @@ func (d *Deployment) GetCachedStatus() inspectorInterface.Inspector {
func (d *Deployment) SetCachedStatus(i inspectorInterface.Inspector) {
d.currentState = i
}
func (d *Deployment) WithArangoMemberUpdate(ctx context.Context, namespace, name string, action resources.ArangoMemberUpdateFunc) error {
o, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).Get(ctx, name, meta.GetOptions{})
if err != nil {
return err
}
if action(o) {
if _, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).Update(ctx, o, meta.UpdateOptions{}); err != nil {
return err
}
}
return nil
}
func (d *Deployment) WithArangoMemberStatusUpdate(ctx context.Context, namespace, name string, action resources.ArangoMemberStatusUpdateFunc) error {
o, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).Get(ctx, name, meta.GetOptions{})
if err != nil {
return err
}
status := o.Status.DeepCopy()
if action(o, status) {
o.Status = *status
if _, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).UpdateStatus(ctx, o, meta.UpdateOptions{}); err != nil {
return err
}
}
return nil
}

View file

@ -298,7 +298,7 @@ func (d *Deployment) run() {
for {
select {
case <-d.stopCh:
cachedStatus, err := inspector.NewInspector(d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
cachedStatus, err := inspector.NewInspector(context.Background(), d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
if err != nil {
log.Error().Err(err).Msg("Unable to get resources")
}

View file

@ -104,7 +104,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
deploymentName := d.GetName()
defer metrics.SetDuration(inspectDeploymentDurationGauges.WithLabelValues(deploymentName), start)
cachedStatus, err := inspector.NewInspector(d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
cachedStatus, err := inspector.NewInspector(context.Background(), d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
if err != nil {
log.Error().Err(err).Msg("Unable to get resources")
return minInspectionInterval // Retry ASAP
@ -293,7 +293,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
return minInspectionInterval, nil
}
if d.apiObject.Status.Plan.IsEmpty() && status.AppliedVersion != checksum {
if d.apiObject.Status.IsPlanEmpty() && status.AppliedVersion != checksum {
if err := d.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {
s.AppliedVersion = checksum
return true
@ -303,7 +303,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
return minInspectionInterval, nil
} else if status.AppliedVersion == checksum {
if !status.Plan.IsEmpty() && status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
if !d.apiObject.Status.IsPlanEmpty() && status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
if err = d.updateCondition(ctx, api.ConditionTypeUpToDate, false, "Plan is not empty", "There are pending operations in plan"); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Unable to update UpToDate condition")
}
@ -311,7 +311,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
return minInspectionInterval, nil
}
if status.Plan.IsEmpty() && !status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
if d.apiObject.Status.IsPlanEmpty() && !status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
if err = d.updateCondition(ctx, api.ConditionTypeUpToDate, true, "Spec is Up To Date", "Spec is Up To Date"); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Unable to update UpToDate condition")
}

View file

@ -29,9 +29,10 @@ import (
"fmt"
"testing"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/rs/zerolog/log"
@ -65,7 +66,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
errs := 0
for {
cache, err := inspector.NewInspector(d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
cache, err := inspector.NewInspector(context.Background(), d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
require.NoError(t, err)
err = d.resources.EnsureSecrets(context.Background(), log.Logger, cache)
if err == nil {
@ -96,6 +97,16 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
if testCase.Resources != nil {
testCase.Resources(t, d)
}
// Set features
{
*features.EncryptionRotation().EnabledPointer() = testCase.Features.EncryptionRotation
require.Equal(t, testCase.Features.EncryptionRotation, *features.EncryptionRotation().EnabledPointer())
*features.JWTRotation().EnabledPointer() = testCase.Features.JWTRotation
*features.TLSSNI().EnabledPointer() = testCase.Features.TLSSNI
*features.TLSRotation().EnabledPointer() = testCase.Features.TLSRotation
}
// Set Pending phase
require.NoError(t, d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
@ -110,8 +121,11 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
}))
// Set members
require.NoError(t, d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
if err := d.status.last.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
c := d.GetArangoCli()
k := d.GetKubeCli()
member := api.ArangoMember{
ObjectMeta: metav1.ObjectMeta{
Namespace: d.GetNamespace(),
@ -123,7 +137,6 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
},
}
c := d.GetArangoCli()
if _, err := c.DatabaseV1().ArangoMembers(member.GetNamespace()).Create(context.Background(), &member, metav1.CreateOptions{}); err != nil {
return err
}
@ -135,26 +148,51 @@ func runTestCase(t *testing.T, testCase testCaseStruct) {
},
}
k := d.GetKubeCli()
if _, err := k.CoreV1().Services(member.GetNamespace()).Create(context.Background(), &s, metav1.CreateOptions{}); err != nil {
return err
}
cache, err := inspector.NewInspector(context.Background(), d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
require.NoError(t, err)
groupSpec := d.apiObject.Spec.GetServerGroupSpec(group)
image, ok := d.resources.SelectImage(d.apiObject.Spec, d.status.last)
require.True(t, ok)
template, err := d.resources.RenderPodTemplateForMember(context.Background(), cache, d.apiObject.Spec, d.status.last, m.ID, image)
if err != nil {
return err
}
checksum, err := resources.ChecksumArangoPod(groupSpec, resources.CreatePodFromTemplate(template))
require.NoError(t, err)
podTemplate, err := api.GetArangoMemberPodTemplate(template, checksum)
require.NoError(t, err)
member.Status.Template = podTemplate
member.Spec.Template = podTemplate
if _, err := c.DatabaseV1().ArangoMembers(member.GetNamespace()).Update(context.Background(), &member, metav1.UpdateOptions{}); err != nil {
return err
}
if _, err := c.DatabaseV1().ArangoMembers(member.GetNamespace()).UpdateStatus(context.Background(), &member, metav1.UpdateOptions{}); err != nil {
return err
}
}
return nil
}))
// Set features
{
*features.EncryptionRotation().EnabledPointer() = testCase.Features.EncryptionRotation
require.Equal(t, testCase.Features.EncryptionRotation, *features.EncryptionRotation().EnabledPointer())
*features.JWTRotation().EnabledPointer() = testCase.Features.JWTRotation
*features.TLSSNI().EnabledPointer() = testCase.Features.TLSSNI
*features.TLSRotation().EnabledPointer() = testCase.Features.TLSRotation
}); err != nil {
if testCase.ExpectedError != nil && assert.EqualError(t, err, testCase.ExpectedError.Error()) {
return
}
require.NoError(t, err)
}
// Act
cache, err := inspector.NewInspector(d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
cache, err := inspector.NewInspector(context.Background(), d.GetKubeCli(), d.GetMonitoringV1Cli(), d.GetArangoCli(), d.GetNamespace())
require.NoError(t, err)
err = d.resources.EnsurePods(context.Background(), cache)

View file

@ -455,6 +455,17 @@ func createTestDeployment(config Config, arangoDeployment *api.ArangoDeployment)
Namespace: testNamespace,
}
arangoDeployment.Status.Images = api.ImageInfoList{
{
Image: "arangodb/arangodb:latest",
ImageID: "arangodb/arangodb:latest",
ArangoDBVersion: "1.0.0",
Enterprise: false,
},
}
arangoDeployment.Status.CurrentImage = &arangoDeployment.Status.Images[0]
deps := Dependencies{
Log: zerolog.New(ioutil.Discard),
KubeCli: kubernetesClientSet,

View file

@ -0,0 +1,152 @@
//
// DISCLAIMER
//
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/deployment/pod"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/rs/zerolog/log"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/rs/zerolog"
)
func init() {
registerAction(api.ActionTypeArangoMemberUpdatePodSpec, newArangoMemberUpdatePodSpecAction)
}
// newArangoMemberUpdatePodSpecAction creates a new Action that implements the given
// planned ArangoMemberUpdatePodSpec action.
func newArangoMemberUpdatePodSpecAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &actionArangoMemberUpdatePodSpec{}
a.actionImpl = newActionImplDefRef(log, action, actionCtx, defaultTimeout)
return a
}
// actionArangoMemberUpdatePodSpec implements an ArangoMemberUpdatePodSpec.
type actionArangoMemberUpdatePodSpec struct {
// actionImpl implement timeout and member id functions
actionImpl
// actionEmptyCheckProgress implement check progress with empty implementation
actionEmptyCheckProgress
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionArangoMemberUpdatePodSpec) Start(ctx context.Context) (bool, error) {
spec := a.actionCtx.GetSpec()
status := a.actionCtx.GetStatus()
m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !found {
log.Error().Msg("No such member")
return true, nil
}
cache := a.actionCtx.GetCachedStatus()
member, ok := cache.ArangoMember(m.ArangoMemberName(a.actionCtx.GetName(), a.action.Group))
if !ok {
err := errors.Newf("ArangoMember not found")
log.Error().Err(err).Msg("ArangoMember not found")
return false, err
}
endpoint, err := pod.GenerateMemberEndpoint(a.actionCtx.GetCachedStatus(), a.actionCtx.GetAPIObject(), spec, a.action.Group, m)
if err != nil {
log.Error().Err(err).Msg("Unable to render endpoint")
return false, err
}
if m.Endpoint == nil || *m.Endpoint != endpoint {
// Update endpoint
m.Endpoint = &endpoint
if err := status.Members.Update(m, a.action.Group); err != nil {
log.Error().Err(err).Msg("Unable to update endpoint")
return false, err
}
}
groupSpec := spec.GetServerGroupSpec(a.action.Group)
imageInfo, imageFound := a.actionCtx.SelectImage(spec, status)
if !imageFound {
// Image is not found, so rotation is not needed
return true, nil
}
if m.Image != nil {
imageInfo = *m.Image
}
renderedPod, err := a.actionCtx.RenderPodTemplateForMember(ctx, a.actionCtx.GetCachedStatus(), spec, status, a.action.MemberID, imageInfo)
if err != nil {
log.Err(err).Msg("Error while rendering pod")
return false, err
}
checksum, err := resources.ChecksumArangoPod(groupSpec, resources.CreatePodFromTemplate(renderedPod))
if err != nil {
log.Err(err).Msg("Error while getting pod checksum")
return false, err
}
template, err := api.GetArangoMemberPodTemplate(renderedPod, checksum)
if err != nil {
log.Err(err).Msg("Error while getting pod template")
return false, err
}
if err := a.actionCtx.WithArangoMemberUpdate(context.Background(), member.GetNamespace(), member.GetName(), func(member *api.ArangoMember) bool {
if !member.Spec.Template.Equals(template) {
member.Spec.Template = template.DeepCopy()
return true
}
return false
}); err != nil {
log.Err(err).Msg("Error while updating member")
return false, err
}
if err := a.actionCtx.WithArangoMemberStatusUpdate(context.Background(), member.GetNamespace(), member.GetName(), func(member *api.ArangoMember, status *api.ArangoMemberStatus) bool {
if (status.Template == nil || status.Template.PodSpec == nil) && (m.PodSpecVersion == "" || m.PodSpecVersion == template.PodSpecChecksum) {
status.Template = template.DeepCopy()
}
return true
}); err != nil {
log.Err(err).Msg("Error while updating member status")
return false, err
}
return true, nil
}

View file

@ -0,0 +1,91 @@
//
// DISCLAIMER
//
// Copyright 2020-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/rs/zerolog/log"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/rs/zerolog"
)
func init() {
registerAction(api.ActionTypeArangoMemberUpdatePodStatus, newArangoMemberUpdatePodStatusAction)
}
// newArangoMemberUpdatePodStatusAction creates a new Action that implements the given
// planned ArangoMemberUpdatePodStatus action.
func newArangoMemberUpdatePodStatusAction(log zerolog.Logger, action api.Action, actionCtx ActionContext) Action {
a := &actionArangoMemberUpdatePodStatus{}
a.actionImpl = newActionImplDefRef(log, action, actionCtx, defaultTimeout)
return a
}
// actionArangoMemberUpdatePodStatus implements an ArangoMemberUpdatePodStatus.
type actionArangoMemberUpdatePodStatus struct {
// actionImpl implement timeout and member id functions
actionImpl
// actionEmptyCheckProgress implement check progress with empty implementation
actionEmptyCheckProgress
}
// Start performs the start of the action.
// Returns true if the action is completely finished, false in case
// the start time needs to be recorded and a ready condition needs to be checked.
func (a *actionArangoMemberUpdatePodStatus) Start(ctx context.Context) (bool, error) {
m, found := a.actionCtx.GetMemberStatusByID(a.action.MemberID)
if !found {
log.Error().Msg("No such member")
return true, nil
}
cache := a.actionCtx.GetCachedStatus()
member, ok := cache.ArangoMember(m.ArangoMemberName(a.actionCtx.GetName(), a.action.Group))
if !ok {
err := errors.Newf("ArangoMember not found")
log.Error().Err(err).Msg("ArangoMember not found")
return false, err
}
if member.Status.Template == nil || !member.Status.Template.Equals(member.Spec.Template) {
if err := a.actionCtx.WithArangoMemberStatusUpdate(context.Background(), member.GetNamespace(), member.GetName(), func(obj *api.ArangoMember, status *api.ArangoMemberStatus) bool {
if status.Template == nil || !status.Template.Equals(member.Spec.Template) {
status.Template = member.Spec.Template.DeepCopy()
return true
}
return false
}); err != nil {
log.Err(err).Msg("Error while updating member")
return false, err
}
}
return true, nil
}

View file

@ -36,7 +36,7 @@ import (
"github.com/arangodb/go-driver/agency"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
v1 "k8s.io/api/core/v1"
core "k8s.io/api/core/v1"
"github.com/arangodb/arangosync-client/client"
driver "github.com/arangodb/go-driver"
@ -51,6 +51,8 @@ import (
type ActionContext interface {
resources.DeploymentStatusUpdate
resources.DeploymentAgencyMaintenance
resources.ArangoMemberContext
resources.DeploymentPodRenderer
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
@ -88,7 +90,7 @@ type ActionContext interface {
// RemoveMemberByID removes a member with given id.
RemoveMemberByID(ctx context.Context, id string) error
// GetPod returns pod.
GetPod(ctx context.Context, podName string) (*v1.Pod, error)
GetPod(ctx context.Context, podName string) (*core.Pod, error)
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(ctx context.Context, podName string) error
@ -97,10 +99,10 @@ type ActionContext interface {
DeletePvc(ctx context.Context, pvcName string) error
// GetPvc returns PVC info about PVC with given name in the namespace
// of the deployment.
GetPvc(ctx context.Context, pvcName string) (*v1.PersistentVolumeClaim, error)
GetPvc(ctx context.Context, pvcName string) (*core.PersistentVolumeClaim, error)
// UpdatePvc update PVC with given name in the namespace
// of the deployment.
UpdatePvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) error
UpdatePvc(ctx context.Context, pvc *core.PersistentVolumeClaim) error
// RemovePodFinalizers removes all the finalizers from the Pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
RemovePodFinalizers(ctx context.Context, podName string) error
@ -139,8 +141,10 @@ type ActionContext interface {
GetBackup(ctx context.Context, backup string) (*backupApi.ArangoBackup, error)
// GetName receives information about a deployment name
GetName() string
// GetNameget current cached state of deployment
// GetCachedStatus current cached state of deployment
GetCachedStatus() inspectorInterface.Inspector
// SelectImage select currently used image by pod
SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool)
}
// newActionContext creates a new ActionContext implementation.
@ -154,11 +158,19 @@ func newActionContext(log zerolog.Logger, context Context, cachedStatus inspecto
// actionContext implements ActionContext
type actionContext struct {
log zerolog.Logger
context Context
log zerolog.Logger
cachedStatus inspectorInterface.Inspector
}
func (ac *actionContext) RenderPodForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.Pod, error) {
return ac.context.RenderPodForMemberFromCurrent(ctx, cachedStatus, memberID)
}
func (ac *actionContext) RenderPodTemplateForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.PodTemplateSpec, error) {
return ac.context.RenderPodTemplateForMemberFromCurrent(ctx, cachedStatus, memberID)
}
func (ac *actionContext) GetAgencyMaintenanceMode(ctx context.Context) (bool, error) {
return ac.context.GetAgencyMaintenanceMode(ctx)
}
@ -167,6 +179,26 @@ func (ac *actionContext) SetAgencyMaintenanceMode(ctx context.Context, enabled b
return ac.context.SetAgencyMaintenanceMode(ctx, enabled)
}
func (ac *actionContext) WithArangoMemberUpdate(ctx context.Context, namespace, name string, action resources.ArangoMemberUpdateFunc) error {
return ac.context.WithArangoMemberUpdate(ctx, namespace, name, action)
}
func (ac *actionContext) WithArangoMemberStatusUpdate(ctx context.Context, namespace, name string, action resources.ArangoMemberStatusUpdateFunc) error {
return ac.context.WithArangoMemberStatusUpdate(ctx, namespace, name, action)
}
func (ac *actionContext) RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
return ac.context.RenderPodForMember(ctx, cachedStatus, spec, status, memberID, imageInfo)
}
func (ac *actionContext) RenderPodTemplateForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.PodTemplateSpec, error) {
return ac.context.RenderPodTemplateForMember(ctx, cachedStatus, spec, status, memberID, imageInfo)
}
func (ac *actionContext) SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool) {
return ac.context.SelectImage(spec, status)
}
func (ac *actionContext) GetCachedStatus() inspectorInterface.Inspector {
return ac.cachedStatus
}
@ -209,7 +241,7 @@ func (ac *actionContext) GetAPIObject() k8sutil.APIObject {
return ac.context.GetAPIObject()
}
func (ac *actionContext) UpdatePvc(ctx context.Context, pvc *v1.PersistentVolumeClaim) error {
func (ac *actionContext) UpdatePvc(ctx context.Context, pvc *core.PersistentVolumeClaim) error {
return ac.context.UpdatePvc(ctx, pvc)
}
@ -217,7 +249,7 @@ func (ac *actionContext) CreateEvent(evt *k8sutil.Event) {
ac.context.CreateEvent(evt)
}
func (ac *actionContext) GetPvc(ctx context.Context, pvcName string) (*v1.PersistentVolumeClaim, error) {
func (ac *actionContext) GetPvc(ctx context.Context, pvcName string) (*core.PersistentVolumeClaim, error) {
return ac.context.GetPvc(ctx, pvcName)
}
@ -346,7 +378,7 @@ func (ac *actionContext) RemoveMemberByID(ctx context.Context, id string) error
}
// GetPod returns pod.
func (ac *actionContext) GetPod(ctx context.Context, podName string) (*v1.Pod, error) {
func (ac *actionContext) GetPod(ctx context.Context, podName string) (*core.Pod, error) {
if pod, err := ac.context.GetPod(ctx, podName); err != nil {
return nil, errors.WithStack(err)
} else {

View file

@ -37,13 +37,15 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
)
// Context provides methods to the reconcile package.
type Context interface {
resources.DeploymentStatusUpdate
resources.DeploymentAgencyMaintenance
resources.ArangoMemberContext
resources.DeploymentPodRenderer
resources.DeploymentImageManager
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
@ -112,10 +114,6 @@ type Context interface {
EnableScalingCluster(ctx context.Context) error
// GetAgencyData object for key path
GetAgencyData(ctx context.Context, i interface{}, keyParts ...string) error
// Renders Pod definition for member
RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*v1.Pod, error)
// SelectImage select currently used image by pod
SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool)
// SecretsInterface return secret interface
SecretsInterface() k8sutil.SecretInterface
// GetBackup receives information about a backup resource

View file

@ -0,0 +1,95 @@
//
// DISCLAIMER
//
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
)
func newPlanAppender(pb WithPlanBuilder, current api.Plan) PlanAppender {
return planAppenderType{
current: current,
pb: pb,
}
}
type PlanAppender interface {
Apply(pb planBuilder) PlanAppender
ApplyWithCondition(c planBuilderCondition, pb planBuilder) PlanAppender
ApplySubPlan(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender
ApplyIfEmpty(pb planBuilder) PlanAppender
ApplyWithConditionIfEmpty(c planBuilderCondition, pb planBuilder) PlanAppender
ApplySubPlanIfEmpty(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender
Plan() api.Plan
}
type planAppenderType struct {
pb WithPlanBuilder
current api.Plan
}
func (p planAppenderType) Plan() api.Plan {
return p.current
}
func (p planAppenderType) ApplyIfEmpty(pb planBuilder) PlanAppender {
if p.current.IsEmpty() {
return p.Apply(pb)
}
return p
}
func (p planAppenderType) ApplyWithConditionIfEmpty(c planBuilderCondition, pb planBuilder) PlanAppender {
if p.current.IsEmpty() {
return p.ApplyWithCondition(c, pb)
}
return p
}
func (p planAppenderType) ApplySubPlanIfEmpty(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender {
if p.current.IsEmpty() {
return p.ApplySubPlan(pb, plans...)
}
return p
}
func (p planAppenderType) new(plan api.Plan) planAppenderType {
return planAppenderType{
pb: p.pb,
current: append(p.current, plan...),
}
}
func (p planAppenderType) Apply(pb planBuilder) PlanAppender {
return p.new(p.pb.Apply(pb))
}
func (p planAppenderType) ApplyWithCondition(c planBuilderCondition, pb planBuilder) PlanAppender {
return p.new(p.pb.ApplyWithCondition(c, pb))
}
func (p planAppenderType) ApplySubPlan(pb planBuilderSubPlan, plans ...planBuilder) PlanAppender {
return p.new(p.pb.ApplySubPlan(pb, plans...))
}

View file

@ -27,10 +27,8 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/arangodb/go-driver/agency"
"github.com/arangodb/kube-arangodb/pkg/util/arangod/conn"
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
@ -45,6 +43,9 @@ import (
type PlanBuilderContext interface {
resources.DeploymentStatusUpdate
resources.DeploymentAgencyMaintenance
resources.ArangoMemberContext
resources.DeploymentPodRenderer
resources.DeploymentImageManager
// GetTLSKeyfile returns the keyfile encoded TLS certificate+key for
// the given member.
@ -64,10 +65,6 @@ type PlanBuilderContext interface {
GetSpec() api.DeploymentSpec
// GetAgencyData object for key path
GetAgencyData(ctx context.Context, i interface{}, keyParts ...string) error
// Renders Pod definition for member
RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error)
// SelectImage select currently used image by pod
SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool)
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
GetDatabaseClient(ctx context.Context) (driver.Client, error)

View file

@ -0,0 +1,94 @@
//
// DISCLAIMER
//
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile
import (
"context"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
type planBuilder func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan
type planBuilderCondition func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) bool
type planBuilderSubPlan func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext, w WithPlanBuilder, plans ...planBuilder) api.Plan
func NewWithPlanBuilder(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) WithPlanBuilder {
return &withPlanBuilder{
ctx: ctx,
log: log,
apiObject: apiObject,
spec: spec,
status: status,
cachedStatus: cachedStatus,
context: context,
}
}
type WithPlanBuilder interface {
Apply(p planBuilder) api.Plan
ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan
ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan
}
type withPlanBuilder struct {
ctx context.Context
log zerolog.Logger
apiObject k8sutil.APIObject
spec api.DeploymentSpec
status api.DeploymentStatus
cachedStatus inspectorInterface.Inspector
context PlanBuilderContext
}
func (w withPlanBuilder) ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan {
if !c(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context) {
return api.Plan{}
}
return w.Apply(p)
}
func (w withPlanBuilder) ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context, w, plans...)
}
func (w withPlanBuilder) Apply(p planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context)
}

View file

@ -25,11 +25,13 @@ package reconcile
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
core "k8s.io/api/core/v1"
)
func (d *Reconciler) CreateHighPlan(ctx context.Context, cachedStatus inspectorInterface.Inspector) (error, bool) {
@ -77,21 +79,33 @@ func createHighPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil.A
return currentPlan, false
}
// Check for various scenario's
return newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), nil).
ApplyIfEmpty(updateMemberPodTemplateSpec).
ApplyIfEmpty(updateMemberPhasePlan).
ApplyIfEmpty(createCleanOutPlan).
ApplyIfEmpty(updateMemberRotationFlag).
Plan(), true
}
// updateMemberPodTemplateSpec creates plan to update member Spec
func updateMemberPodTemplateSpec(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
var plan api.Plan
pb := NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx)
// Update member specs
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if reason, changed := arangoMemberPodTemplateNeedsUpdate(ctx, log, apiObject, spec, group, status, m, cachedStatus, context); changed {
plan = append(plan, api.NewAction(api.ActionTypeArangoMemberUpdatePodSpec, group, m.ID, reason))
}
}
if plan.IsEmpty() {
plan = pb.Apply(updateMemberPhasePlan)
}
return nil
})
if plan.IsEmpty() {
plan = pb.Apply(createCleanOutPlan)
}
// Return plan
return plan, true
return plan
}
// updateMemberPhasePlan creates plan to update member phase
@ -106,6 +120,7 @@ func updateMemberPhasePlan(ctx context.Context,
if m.Phase == api.MemberPhaseNone {
plan = append(plan,
api.NewAction(api.ActionTypeMemberRIDUpdate, group, m.ID, "Regenerate member RID"),
api.NewAction(api.ActionTypeArangoMemberUpdatePodStatus, group, m.ID, "Propagating status of pod"),
api.NewAction(api.ActionTypeMemberPhaseUpdate, group, m.ID,
"Move to Pending phase").AddParam(ActionTypeMemberPhaseUpdatePhaseKey, api.MemberPhasePending.String()))
}
@ -116,3 +131,67 @@ func updateMemberPhasePlan(ctx context.Context,
return plan
}
func updateMemberRotationFlag(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
var plan api.Plan
status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
p, found := cachedStatus.Pod(m.PodName)
if !found {
continue
}
if required, reason := updateMemberRotationFlagConditionCheck(log, apiObject, spec, cachedStatus, m, group, p); required {
log.Info().Msgf(reason)
plan = append(plan, restartMemberConditionPlan(group, m.ID, reason)...)
}
}
return nil
})
return plan
}
func restartMemberConditionPlan(group api.ServerGroup, memberID string, reason string) api.Plan {
return api.Plan{
api.NewAction(api.ActionTypeSetMemberCondition, group, memberID, reason).AddParam(api.ConditionTypePendingRestart.String(), "T"),
}
}
func tlsRotateConditionPlan(group api.ServerGroup, memberID string, reason string) api.Plan {
return api.Plan{
api.NewAction(api.ActionTypeSetMemberCondition, group, memberID, reason).AddParam(api.ConditionTypePendingTLSRotation.String(), "T"),
}
}
func updateMemberRotationFlagConditionCheck(log zerolog.Logger, apiObject k8sutil.APIObject, spec api.DeploymentSpec, cachedStatus inspectorInterface.Inspector, m api.MemberStatus, group api.ServerGroup, p *core.Pod) (bool, string) {
if m.Conditions.IsTrue(api.ConditionTypePendingRestart) {
return false, ""
}
if m.Conditions.IsTrue(api.ConditionTypePendingTLSRotation) {
return true, "TLS Rotation pending"
}
pvc, exists := cachedStatus.PersistentVolumeClaim(m.PersistentVolumeClaimName)
if exists {
if k8sutil.IsPersistentVolumeClaimFileSystemResizePending(pvc) {
return true, "PVC Resize pending"
}
}
if changed, reason := podNeedsRotation(log, apiObject, p, spec, group, m, cachedStatus); changed {
return true, reason
}
if _, ok := p.Annotations[deployment.ArangoDeploymentPodRotateAnnotation]; ok {
return true, "Rotation flag present"
}
return false, ""
}

View file

@ -77,13 +77,49 @@ func createNormalPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil
return currentPlan, false
}
// Fetch agency plan
agencyPlan, agencyErr := fetchAgency(ctx, spec, status, builderCtx)
return newPlanAppender(NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx), nil).
// Check for failed members
ApplyIfEmpty(createMemberFailedRestorePlan).
// Check for cleaned out dbserver in created state
ApplyIfEmpty(createRemoveCleanedDBServersPlan).
// Update status
ApplySubPlanIfEmpty(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyStatusUpdate).
ApplyIfEmpty(createTLSStatusUpdate).
ApplyIfEmpty(createJWTStatusUpdate).
// Check for scale up/down
ApplyIfEmpty(createScaleMemberPlan).
// Check for members to be removed
ApplyIfEmpty(createReplaceMemberPlan).
// Check for the need to rotate one or more members
ApplyIfEmpty(createRotateOrUpgradePlan).
// Disable maintenance if upgrade process was done. Upgrade task throw IDLE Action if upgrade is pending
ApplyIfEmpty(createMaintenanceManagementPlan).
// Add keys
ApplySubPlanIfEmpty(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKey).
ApplyIfEmpty(createJWTKeyUpdate).
ApplySubPlanIfEmpty(createTLSStatusPropagatedFieldUpdate, createCARenewalPlan).
ApplySubPlanIfEmpty(createTLSStatusPropagatedFieldUpdate, createCAAppendPlan).
ApplyIfEmpty(createKeyfileRenewalPlan).
ApplyIfEmpty(createRotateServerStoragePlan).
ApplySubPlanIfEmpty(createTLSStatusPropagatedFieldUpdate, createRotateTLSServerSNIPlan).
ApplyIfEmpty(createRestorePlan).
ApplySubPlanIfEmpty(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyCleanPlan).
ApplySubPlanIfEmpty(createTLSStatusPropagatedFieldUpdate, createCACleanPlan).
ApplyIfEmpty(createClusterOperationPlan).
// Final
ApplyIfEmpty(createTLSStatusPropagated).
ApplyIfEmpty(createBootstrapPlan).
Plan(), true
}
// Check for various scenario's
func createMemberFailedRestorePlan(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
var plan api.Plan
pb := NewWithPlanBuilder(ctx, log, apiObject, spec, status, cachedStatus, builderCtx)
// Fetch agency plan
agencyPlan, agencyErr := fetchAgency(ctx, spec, status, context)
// Check for members in failed state
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
@ -149,113 +185,29 @@ func createNormalPlan(ctx context.Context, log zerolog.Logger, apiObject k8sutil
// Ensure that we were able to get agency info
if len(plan) == 0 && agencyErr != nil {
log.Err(agencyErr).Msg("unable to build further plan without access to agency")
return append(plan,
api.NewAction(api.ActionTypeIdle, api.ServerGroupUnknown, "")), true
plan = append(plan,
api.NewAction(api.ActionTypeIdle, api.ServerGroupUnknown, ""))
}
// Check for cleaned out dbserver in created state
return plan
}
func createRemoveCleanedDBServersPlan(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
for _, m := range status.Members.DBServers {
if plan.IsEmpty() && m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
if m.Phase.IsCreatedOrDrain() && m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
log.Debug().
Str("id", m.ID).
Str("role", api.ServerGroupDBServers.AsRole()).
Msg("Creating dbserver replacement plan because server is cleanout in created phase")
plan = append(plan,
return api.Plan{
api.NewAction(api.ActionTypeRemoveMember, api.ServerGroupDBServers, m.ID),
api.NewAction(api.ActionTypeAddMember, api.ServerGroupDBServers, ""),
)
}
}
}
// Update status
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyStatusUpdate)
}
if plan.IsEmpty() {
plan = pb.Apply(createTLSStatusUpdate)
}
if plan.IsEmpty() {
plan = pb.Apply(createJWTStatusUpdate)
}
// Check for scale up/down
if plan.IsEmpty() {
plan = pb.Apply(createScaleMemberPlan)
}
// Check for members to be removed
if plan.IsEmpty() {
plan = pb.Apply(createReplaceMemberPlan)
}
// Check for the need to rotate one or more members
if plan.IsEmpty() {
plan = pb.Apply(createRotateOrUpgradePlan)
}
// Disable maintenance if upgrade process was done. Upgrade task throw IDLE Action if upgrade is pending
if plan.IsEmpty() {
plan = pb.Apply(createMaintenanceManagementPlan)
}
// Add keys
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKey)
}
if plan.IsEmpty() {
plan = pb.Apply(createJWTKeyUpdate)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCARenewalPlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCAAppendPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createKeyfileRenewalPlan)
}
// Check for changes storage classes or requirements
if plan.IsEmpty() {
plan = pb.Apply(createRotateServerStoragePlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createRotateTLSServerSNIPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createRestorePlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createEncryptionKeyStatusPropagatedFieldUpdate, createEncryptionKeyCleanPlan)
}
if plan.IsEmpty() {
plan = pb.ApplySubPlan(createTLSStatusPropagatedFieldUpdate, createCACleanPlan)
}
if plan.IsEmpty() {
plan = pb.Apply(createClusterOperationPlan)
}
// Final
if plan.IsEmpty() {
plan = pb.Apply(createTLSStatusPropagated)
}
if plan.IsEmpty() {
plan = pb.Apply(createBootstrapPlan)
}
// Return plan
return plan, true
return nil
}

View file

@ -27,8 +27,6 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
json "github.com/json-iterator/go"
"github.com/arangodb/kube-arangodb/pkg/deployment/pod"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
@ -76,7 +74,7 @@ func createRotateOrUpgradePlan(ctx context.Context,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan {
var plan api.Plan
newPlan, idle := createRotateOrUpgradePlanInternal(ctx, log, apiObject, spec, status, cachedStatus, context)
newPlan, idle := createRotateOrUpgradePlanInternal(log, apiObject, spec, status, cachedStatus, context)
if idle {
plan = append(plan,
api.NewAction(api.ActionTypeIdle, api.ServerGroupUnknown, ""))
@ -86,8 +84,7 @@ func createRotateOrUpgradePlan(ctx context.Context,
return plan
}
func createRotateOrUpgradePlanInternal(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject, spec api.DeploymentSpec,
status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) (api.Plan, bool) {
func createRotateOrUpgradePlanInternal(log zerolog.Logger, apiObject k8sutil.APIObject, spec api.DeploymentSpec, status api.DeploymentStatus, cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) (api.Plan, bool) {
var newPlan api.Plan
var upgradeNotAllowed bool
@ -95,18 +92,12 @@ func createRotateOrUpgradePlanInternal(ctx context.Context, log zerolog.Logger,
var fromLicense, toLicense upgraderules.License
status.Members.ForeachServerGroup(func(group api.ServerGroup, members api.MemberStatusList) error {
for _, m := range members {
if m.Phase != api.MemberPhaseCreated || m.PodName == "" {
// Only rotate when phase is created
continue
}
pod, found := cachedStatus.Pod(m.PodName)
if !found {
continue
}
// Got pod, compare it with what it should be
decision := podNeedsUpgrading(log, m, spec, status.Images)
if decision.Hold {
@ -133,10 +124,8 @@ func createRotateOrUpgradePlanInternal(ctx context.Context, log zerolog.Logger,
newPlan = createUpgradeMemberPlan(log, m, group, "Version upgrade", spec, status,
!decision.AutoUpgradeNeeded)
} else {
// Use new level of rotate logic
rotNeeded, reason := podNeedsRotation(ctx, log, apiObject, pod, spec, group, status, m, cachedStatus, context)
if rotNeeded {
newPlan = createRotateMemberPlan(log, m, group, reason)
if m.Conditions.IsTrue(api.ConditionTypePendingRestart) {
newPlan = createRotateMemberPlan(log, m, group, "Restart flag present")
}
}
@ -170,11 +159,6 @@ func createRotateOrUpgradePlanInternal(ctx context.Context, log zerolog.Logger,
newPlan = api.Plan{api.NewAction(api.ActionTypeMarkToRemoveMember, group, m.ID, "Replace flag present")}
continue
}
if _, ok := pod.Annotations[deployment.ArangoDeploymentPodRotateAnnotation]; ok {
newPlan = createRotateMemberPlan(log, m, group, "Rotation flag present")
continue
}
}
}
@ -298,14 +282,57 @@ func memberImageInfo(spec api.DeploymentSpec, status api.MemberStatus, images ap
return api.ImageInfo{}, false
}
func getPodDetails(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject, spec api.DeploymentSpec,
group api.ServerGroup, status api.DeploymentStatus, m api.MemberStatus,
cachedStatus inspectorInterface.Inspector, planCtx PlanBuilderContext) (string, *core.Pod, *api.ArangoMember, bool) {
imageInfo, imageFound := planCtx.SelectImageForMember(spec, status, m)
if !imageFound {
// Image is not found, so rotation is not needed
return "", nil, nil, false
}
member, ok := cachedStatus.ArangoMember(m.ArangoMemberName(apiObject.GetName(), group))
if !ok {
return "", nil, nil, false
}
groupSpec := spec.GetServerGroupSpec(group)
renderedPod, err := planCtx.RenderPodForMember(ctx, cachedStatus, spec, status, m.ID, imageInfo)
if err != nil {
log.Err(err).Msg("Error while rendering pod")
return "", nil, nil, false
}
checksum, err := resources.ChecksumArangoPod(groupSpec, renderedPod)
if err != nil {
log.Err(err).Msg("Error while getting pod checksum")
return "", nil, nil, false
}
return checksum, renderedPod, member, true
}
// arangoMemberPodTemplateNeedsUpdate returns true when the specification of the
// given pod differs from what it should be according to the
// given deployment spec.
// When true is returned, a reason for the rotation is already returned.
func arangoMemberPodTemplateNeedsUpdate(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject, spec api.DeploymentSpec,
group api.ServerGroup, status api.DeploymentStatus, m api.MemberStatus,
cachedStatus inspectorInterface.Inspector, planCtx PlanBuilderContext) (string, bool) {
checksum, _, member, valid := getPodDetails(ctx, log, apiObject, spec, group, status, m, cachedStatus, planCtx)
if valid && !member.Spec.Template.EqualPodSpecChecksum(checksum) {
return "Pod Spec changed", true
}
return "", false
}
// podNeedsRotation returns true when the specification of the
// given pod differs from what it should be according to the
// given deployment spec.
// When true is returned, a reason for the rotation is already returned.
func podNeedsRotation(ctx context.Context, log zerolog.Logger, apiObject k8sutil.APIObject, p *core.Pod, spec api.DeploymentSpec,
group api.ServerGroup, status api.DeploymentStatus, m api.MemberStatus,
cachedStatus inspectorInterface.Inspector, planCtx PlanBuilderContext) (bool, string) {
func podNeedsRotation(log zerolog.Logger, apiObject k8sutil.APIObject, p *core.Pod, spec api.DeploymentSpec, group api.ServerGroup, m api.MemberStatus, cachedStatus inspectorInterface.Inspector) (bool, string) {
if m.PodUID != p.UID {
return true, "Pod UID does not match, this pod is not managed by Operator. Recreating"
}
@ -314,34 +341,17 @@ func podNeedsRotation(ctx context.Context, log zerolog.Logger, apiObject k8sutil
return true, "Pod Spec Version is nil - recreating pod"
}
imageInfo, imageFound := planCtx.SelectImage(spec, status)
if !imageFound {
// Image is not found, so rotation is not needed
member, ok := cachedStatus.ArangoMember(m.ArangoMemberName(apiObject.GetName(), group))
if !ok {
return false, ""
}
if m.Image != nil {
imageInfo = *m.Image
}
groupSpec := spec.GetServerGroupSpec(group)
renderedPod, err := planCtx.RenderPodForMember(ctx, cachedStatus, spec, status, m.ID, imageInfo)
if err != nil {
log.Err(err).Msg("Error while rendering pod")
if member.Status.Template == nil {
return false, ""
}
checksum, err := resources.ChecksumArangoPod(groupSpec, renderedPod)
if err != nil {
log.Err(err).Msg("Error while getting pod checksum")
return false, ""
}
if m.PodSpecVersion != checksum {
if _, err := json.Marshal(renderedPod); err == nil {
log.Info().Str("id", m.ID).Str("Before", m.PodSpecVersion).Str("After", checksum).Msgf("XXXXXXXXXXX Pod needs rotation - checksum does not match")
}
if member.Status.Template.RotationNeeded(member.Spec.Template) {
log.Info().Str("id", m.ID).Str("Before", m.PodSpecVersion).Msgf("Pod needs rotation - templates does not match")
return true, "Pod needs rotation - checksum does not match"
}

View file

@ -61,6 +61,7 @@ func createRotateServerStoragePlan(ctx context.Context,
}
groupSpec := spec.GetServerGroupSpec(group)
storageClassName := groupSpec.GetStorageClassName()
// Load PVC
pvc, exists := cachedStatus.PersistentVolumeClaim(m.PersistentVolumeClaimName)
if !exists {
@ -98,9 +99,6 @@ func createRotateServerStoragePlan(ctx context.Context,
// Only agents & dbservers are allowed to change their storage class.
context.CreateEvent(k8sutil.NewCannotChangeStorageClassEvent(apiObject, m.ID, group.AsRole(), "Not supported"))
}
} else if k8sutil.IsPersistentVolumeClaimFileSystemResizePending(pvc) {
// rotation needed
plan = createRotateMemberPlan(log, m, group, "Filesystem resize pending")
} else {
if groupSpec.HasVolumeClaimTemplate() {
res := groupSpec.GetVolumeClaimTemplate().Spec.Resources.Requests

View file

@ -75,6 +75,30 @@ type testContext struct {
RecordedEvent *k8sutil.Event
}
func (c *testContext) RenderPodForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.Pod, error) {
panic("implement me")
}
func (c *testContext) RenderPodTemplateForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.PodTemplateSpec, error) {
return &core.PodTemplateSpec{}, nil
}
func (c *testContext) SelectImageForMember(spec api.DeploymentSpec, status api.DeploymentStatus, member api.MemberStatus) (api.ImageInfo, bool) {
return c.SelectImage(spec, status)
}
func (c *testContext) RenderPodTemplateForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.PodTemplateSpec, error) {
panic("implement me")
}
func (c *testContext) WithArangoMemberUpdate(ctx context.Context, namespace, name string, action resources.ArangoMemberUpdateFunc) error {
panic("implement me")
}
func (c *testContext) WithArangoMemberStatusUpdate(ctx context.Context, namespace, name string, action resources.ArangoMemberStatusUpdateFunc) error {
panic("implement me")
}
func (c *testContext) GetAgencyMaintenanceMode(ctx context.Context) (bool, error) {
panic("implement me")
}
@ -108,7 +132,7 @@ func (c *testContext) GetAuthentication() conn.Auth {
}
func (c *testContext) RenderPodForMember(_ context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
panic("implement me")
return &core.Pod{}, nil
}
func (c *testContext) GetName() string {
@ -124,7 +148,12 @@ func (c *testContext) SecretsInterface() k8sutil.SecretInterface {
}
func (c *testContext) SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool) {
panic("implement me")
return api.ImageInfo{
Image: "",
ImageID: "",
ArangoDBVersion: "",
Enterprise: false,
}, true
}
func (c *testContext) UpdatePvc(_ context.Context, pvc *core.PersistentVolumeClaim) error {
@ -528,28 +557,60 @@ func (l *LastLogRecord) Run(e *zerolog.Event, level zerolog.Level, msg string) {
l.msg = msg
}
type testCase struct {
Name string
context *testContext
Helper func(*api.ArangoDeployment)
ExpectedError error
ExpectedPlan api.Plan
ExpectedHighPlan api.Plan
ExpectedLog string
ExpectedEvent *k8sutil.Event
Pods map[string]*core.Pod
Secrets map[string]*core.Secret
Services map[string]*core.Service
PVCS map[string]*core.PersistentVolumeClaim
ServiceAccounts map[string]*core.ServiceAccount
PDBS map[string]*policy.PodDisruptionBudget
ServiceMonitors map[string]*monitoring.ServiceMonitor
ArangoMembers map[string]*api.ArangoMember
Extender func(t *testing.T, r *Reconciler, c *testCase)
}
func (t testCase) Inspector() inspectorInterface.Inspector {
return inspector.NewInspectorFromData(t.Pods, t.Secrets, t.PVCS, t.Services, t.ServiceAccounts, t.PDBS, t.ServiceMonitors, t.ArangoMembers)
}
func TestCreatePlan(t *testing.T) {
// Arrange
threeCoordinators := api.MemberStatusList{
{
ID: "1",
ID: "1",
PodName: "coordinator1",
},
{
ID: "2",
ID: "2",
PodName: "coordinator2",
},
{
ID: "3",
ID: "3",
PodName: "coordinator3",
},
}
threeDBServers := api.MemberStatusList{
{
ID: "1",
ID: "1",
PodName: "dbserver1",
},
{
ID: "2",
ID: "2",
PodName: "dbserver2",
},
{
ID: "3",
ID: "3",
PodName: "dbserver3",
},
}
@ -574,24 +635,7 @@ func TestCreatePlan(t *testing.T) {
addAgentsToStatus(t, &deploymentTemplate.Status, 3)
deploymentTemplate.Spec.SetDefaults("createPlanTest")
testCases := []struct {
Name string
context *testContext
Helper func(*api.ArangoDeployment)
ExpectedError error
ExpectedPlan api.Plan
ExpectedLog string
ExpectedEvent *k8sutil.Event
Pods map[string]*core.Pod
Secrets map[string]*core.Secret
Services map[string]*core.Service
PVCS map[string]*core.PersistentVolumeClaim
ServiceAccounts map[string]*core.ServiceAccount
PDBS map[string]*policy.PodDisruptionBudget
ServiceMonitors map[string]*monitoring.ServiceMonitor
ArangoMembers map[string]*api.ArangoMember
}{
testCases := []testCase{
{
Name: "Can not create plan for single deployment",
context: &testContext{
@ -735,6 +779,52 @@ func TestCreatePlan(t *testing.T) {
},
},
},
Extender: func(t *testing.T, r *Reconciler, c *testCase) {
// Add ArangoMember
builderCtx := newPlanBuilderContext(r.context)
template, err := builderCtx.RenderPodTemplateForMemberFromCurrent(context.Background(), c.Inspector(), c.context.ArangoDeployment.Status.Members.Agents[0].ID)
require.NoError(t, err)
checksum, err := resources.ChecksumArangoPod(c.context.ArangoDeployment.Spec.Agents, resources.CreatePodFromTemplate(template))
require.NoError(t, err)
templateSpec, err := api.GetArangoMemberPodTemplate(template, checksum)
require.NoError(t, err)
name := c.context.ArangoDeployment.Status.Members.Agents[0].ArangoMemberName(c.context.ArangoDeployment.Name, api.ServerGroupAgents)
c.ArangoMembers = map[string]*api.ArangoMember{
name: {
ObjectMeta: meta.ObjectMeta{
Name: name,
},
Spec: api.ArangoMemberSpec{
Template: templateSpec,
},
Status: api.ArangoMemberStatus{
Template: templateSpec,
},
},
}
c.Pods = map[string]*core.Pod{
c.context.ArangoDeployment.Status.Members.Agents[0].PodName: {
ObjectMeta: meta.ObjectMeta{
Name: c.context.ArangoDeployment.Status.Members.Agents[0].PodName,
},
},
}
require.NoError(t, c.context.ArangoDeployment.Status.Members.ForeachServerGroup(func(group api.ServerGroup, list api.MemberStatusList) error {
for _, m := range list {
m.Phase = api.MemberPhaseCreated
require.NoError(t, c.context.ArangoDeployment.Status.Members.Update(m, group))
}
return nil
}))
},
context: &testContext{
ArangoDeployment: deploymentTemplate.DeepCopy(),
},
@ -750,14 +840,10 @@ func TestCreatePlan(t *testing.T) {
ad.Status.Members.Agents[0].Phase = api.MemberPhaseCreated
ad.Status.Members.Agents[0].PersistentVolumeClaimName = "pvc_test"
},
ExpectedPlan: []api.Action{
api.NewAction(api.ActionTypeCleanTLSKeyfileCertificate, api.ServerGroupAgents, "", "Remove server keyfile and enforce renewal/recreation"),
api.NewAction(api.ActionTypeResignLeadership, api.ServerGroupAgents, ""),
api.NewAction(api.ActionTypeRotateMember, api.ServerGroupAgents, ""),
api.NewAction(api.ActionTypeWaitForMemberUp, api.ServerGroupAgents, ""),
api.NewAction(api.ActionTypeWaitForMemberInSync, api.ServerGroupAgents, ""),
ExpectedHighPlan: []api.Action{
api.NewAction(api.ActionTypeSetMemberCondition, api.ServerGroupAgents, deploymentTemplate.Status.Members.Agents[0].ID, "PVC Resize pending"),
},
ExpectedLog: "Creating rotation plan",
ExpectedLog: "PVC Resize pending",
},
{
Name: "Agent in failed state",
@ -850,11 +936,16 @@ func TestCreatePlan(t *testing.T) {
logger := zerolog.New(ioutil.Discard).Hook(h)
r := NewReconciler(logger, testCase.context)
if testCase.Extender != nil {
testCase.Extender(t, r, &testCase)
}
// Act
if testCase.Helper != nil {
testCase.Helper(testCase.context.ArangoDeployment)
}
err, _ := r.CreatePlan(ctx, inspector.NewInspectorFromData(testCase.Pods, testCase.Secrets, testCase.PVCS, testCase.Services, testCase.ServiceAccounts, testCase.PDBS, testCase.ServiceMonitors, testCase.ArangoMembers))
err, _ := r.CreatePlan(ctx, testCase.Inspector())
// Assert
if testCase.ExpectedEvent != nil {
@ -873,10 +964,25 @@ func TestCreatePlan(t *testing.T) {
require.NoError(t, err)
status, _ := testCase.context.GetStatus()
if len(testCase.ExpectedHighPlan) > 0 {
require.Len(t, status.HighPriorityPlan, len(testCase.ExpectedHighPlan))
for i, v := range testCase.ExpectedHighPlan {
assert.Equal(t, v.Type, status.HighPriorityPlan[i].Type)
assert.Equal(t, v.Group, status.HighPriorityPlan[i].Group)
if v.Reason != "*" {
assert.Equal(t, v.Reason, status.HighPriorityPlan[i].Reason)
}
}
}
require.Len(t, status.Plan, len(testCase.ExpectedPlan))
for i, v := range testCase.ExpectedPlan {
assert.Equal(t, v.Type, status.Plan[i].Type)
assert.Equal(t, v.Group, status.Plan[i].Group)
if v.Reason != "*" {
assert.Equal(t, v.Reason, status.Plan[i].Reason)
}
}
})
}

View file

@ -308,12 +308,9 @@ func createKeyfileRenewalPlanDefault(ctx context.Context,
lCtx, c := context.WithTimeout(ctx, 500*time.Millisecond)
defer c()
if renew, recreate := keyfileRenewalRequired(lCtx, log, apiObject, spec, cachedStatus, planCtx, group, member, api.TLSRotateModeRecreate); renew {
if renew, _ := keyfileRenewalRequired(lCtx, log, apiObject, spec, cachedStatus, planCtx, group, member, api.TLSRotateModeRecreate); renew {
log.Info().Msg("Renewal of keyfile required - Recreate")
if recreate {
plan = append(plan, api.NewAction(api.ActionTypeCleanTLSKeyfileCertificate, group, member.ID, "Remove server keyfile and enforce renewal"))
}
plan = append(plan, createRotateMemberPlan(log, member, group, "Restart server after keyfile removal")...)
plan = append(plan, tlsRotateConditionPlan(group, member.ID, "Restart server after keyfile removal")...)
}
}

View file

@ -108,7 +108,7 @@ func createRotateTLSServerSNIPlan(ctx context.Context,
} else if !ok {
switch spec.TLS.Mode.Get() {
case api.TLSRotateModeRecreate:
plan = append(plan, createRotateMemberPlan(log, m, group, "SNI Secret needs update")...)
plan = append(plan, tlsRotateConditionPlan(group, m.ID, "SNI Secret needs update")...)
case api.TLSRotateModeInPlace:
plan = append(plan,
api.NewAction(api.ActionTypeUpdateTLSSNI, group, m.ID, "SNI Secret needs update"))

View file

@ -0,0 +1,23 @@
//
// DISCLAIMER
//
// Copyright 2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Adam Janikowski
//
package reconcile

View file

@ -29,8 +29,6 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/rs/zerolog"
)
@ -66,65 +64,3 @@ func fetchAgency(ctx context.Context, spec api.DeploymentSpec, status api.Deploy
return nil, errors.Newf("not able to read from agency when agency is down")
}
}
type planBuilder func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) api.Plan
type planBuilderCondition func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) bool
type planBuilderSubPlan func(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext, w WithPlanBuilder, plans ...planBuilder) api.Plan
func NewWithPlanBuilder(ctx context.Context,
log zerolog.Logger, apiObject k8sutil.APIObject,
spec api.DeploymentSpec, status api.DeploymentStatus,
cachedStatus inspectorInterface.Inspector, context PlanBuilderContext) WithPlanBuilder {
return &withPlanBuilder{
ctx: ctx,
log: log,
apiObject: apiObject,
spec: spec,
status: status,
cachedStatus: cachedStatus,
context: context,
}
}
type WithPlanBuilder interface {
Apply(p planBuilder) api.Plan
ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan
ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan
}
type withPlanBuilder struct {
ctx context.Context
log zerolog.Logger
apiObject k8sutil.APIObject
spec api.DeploymentSpec
status api.DeploymentStatus
cachedStatus inspectorInterface.Inspector
context PlanBuilderContext
}
func (w withPlanBuilder) ApplyWithCondition(c planBuilderCondition, p planBuilder) api.Plan {
if !c(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context) {
return api.Plan{}
}
return w.Apply(p)
}
func (w withPlanBuilder) ApplySubPlan(p planBuilderSubPlan, plans ...planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context, w, plans...)
}
func (w withPlanBuilder) Apply(p planBuilder) api.Plan {
return p(w.ctx, w.log, w.apiObject, w.spec, w.status, w.cachedStatus, w.context)
}

View file

@ -55,7 +55,7 @@ func (r *Resilience) CheckMemberFailure(ctx context.Context) error {
// Check if there are Members with Phase Upgrading or Rotation but no plan
switch m.Phase {
case api.MemberPhaseNone:
case api.MemberPhaseNone, api.MemberPhasePending:
continue
case api.MemberPhaseUpgrading, api.MemberPhaseRotating, api.MemberPhaseCleanOut, api.MemberPhaseRotateStart:
if len(status.Plan) == 0 {

View file

@ -39,7 +39,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
v1 "k8s.io/api/core/v1"
core "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
@ -67,11 +67,41 @@ type DeploymentAgencyMaintenance interface {
SetAgencyMaintenanceMode(ctx context.Context, enabled bool) error
}
type DeploymentPodRenderer interface {
// RenderPodForMember Renders Pod definition for member
RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error)
// RenderPodTemplateForMember Renders PodTemplate definition for member
RenderPodTemplateForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.PodTemplateSpec, error)
// RenderPodTemplateForMember Renders PodTemplate definition for member from current state
RenderPodForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.Pod, error)
// RenderPodTemplateForMemberFromCurrent Renders PodTemplate definition for member
RenderPodTemplateForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.PodTemplateSpec, error)
}
type DeploymentImageManager interface {
// SelectImage select currently used image by pod
SelectImage(spec api.DeploymentSpec, status api.DeploymentStatus) (api.ImageInfo, bool)
// SelectImage select currently used image by pod in member
SelectImageForMember(spec api.DeploymentSpec, status api.DeploymentStatus, member api.MemberStatus) (api.ImageInfo, bool)
}
type ArangoMemberUpdateFunc func(obj *api.ArangoMember) bool
type ArangoMemberStatusUpdateFunc func(obj *api.ArangoMember, s *api.ArangoMemberStatus) bool
type ArangoMemberContext interface {
// WithArangoMemberUpdate run action with update of ArangoMember
WithArangoMemberUpdate(ctx context.Context, namespace, name string, action ArangoMemberUpdateFunc) error
// WithArangoMemberStatusUpdate run action with update of ArangoMember Status
WithArangoMemberStatusUpdate(ctx context.Context, namespace, name string, action ArangoMemberStatusUpdateFunc) error
}
// Context provides all functions needed by the Resources service
// to perform its service.
type Context interface {
DeploymentStatusUpdate
DeploymentAgencyMaintenance
ArangoMemberContext
DeploymentImageManager
// GetAPIObject returns the deployment as k8s object.
GetAPIObject() k8sutil.APIObject
@ -106,10 +136,10 @@ type Context interface {
// On error, the error is logged.
CreateEvent(evt *k8sutil.Event)
// GetOwnedPVCs returns a list of all PVCs owned by the deployment.
GetOwnedPVCs() ([]v1.PersistentVolumeClaim, error)
GetOwnedPVCs() ([]core.PersistentVolumeClaim, error)
// CleanupPod deletes a given pod with force and explicit UID.
// If the pod does not exist, the error is ignored.
CleanupPod(ctx context.Context, p *v1.Pod) error
CleanupPod(ctx context.Context, p *core.Pod) error
// DeletePod deletes a pod with given name in the namespace
// of the deployment. If the pod does not exist, the error is ignored.
DeletePod(ctx context.Context, podName string) error

View file

@ -27,6 +27,8 @@ import (
"context"
"sync"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
@ -46,49 +48,19 @@ type SecretReadInterface interface {
Get(ctx context.Context, name string, opts meta.GetOptions) (*core.Secret, error)
}
func NewInspector(k kubernetes.Interface, m monitoringClient.MonitoringV1Interface, c versioned.Interface, namespace string) (inspectorInterface.Inspector, error) {
ctx := context.TODO()
pods, err := podsToMap(ctx, k, namespace)
if err != nil {
func NewInspector(ctx context.Context, k kubernetes.Interface, m monitoringClient.MonitoringV1Interface, c versioned.Interface, namespace string) (inspectorInterface.Inspector, error) {
i := &inspector{
namespace: namespace,
k: k,
m: m,
c: c,
}
if err := i.Refresh(ctx); err != nil {
return nil, err
}
secrets, err := secretsToMap(ctx, k, namespace)
if err != nil {
return nil, err
}
pvcs, err := pvcsToMap(ctx, k, namespace)
if err != nil {
return nil, err
}
services, err := servicesToMap(ctx, k, namespace)
if err != nil {
return nil, err
}
serviceAccounts, err := serviceAccountsToMap(ctx, k, namespace)
if err != nil {
return nil, err
}
podDisruptionBudgets, err := podDisruptionBudgetsToMap(ctx, k, namespace)
if err != nil {
return nil, err
}
serviceMonitors, err := serviceMonitorsToMap(ctx, m, namespace)
if err != nil {
return nil, err
}
arangoMembers, err := arangoMembersToMap(ctx, c, namespace)
if err != nil {
return nil, err
}
return NewInspectorFromData(pods, secrets, pvcs, services, serviceAccounts, podDisruptionBudgets, serviceMonitors, arangoMembers), nil
return i, nil
}
func NewEmptyInspector() inspectorInterface.Inspector {
@ -118,6 +90,12 @@ func NewInspectorFromData(pods map[string]*core.Pod,
type inspector struct {
lock sync.Mutex
namespace string
k kubernetes.Interface
m monitoringClient.MonitoringV1Interface
c versioned.Interface
pods map[string]*core.Pod
secrets map[string]*core.Secret
pvcs map[string]*core.PersistentVolumeClaim
@ -128,47 +106,50 @@ type inspector struct {
arangoMembers map[string]*api.ArangoMember
}
func (i *inspector) Refresh(ctx context.Context, k kubernetes.Interface, m monitoringClient.MonitoringV1Interface,
c versioned.Interface, namespace string) error {
func (i *inspector) Refresh(ctx context.Context) error {
i.lock.Lock()
defer i.lock.Unlock()
pods, err := podsToMap(ctx, k, namespace)
if i.namespace == "" {
return errors.New("Inspector created fro mstatic data")
}
pods, err := podsToMap(ctx, i.k, i.namespace)
if err != nil {
return err
}
secrets, err := secretsToMap(ctx, k, namespace)
secrets, err := secretsToMap(ctx, i.k, i.namespace)
if err != nil {
return err
}
pvcs, err := pvcsToMap(ctx, k, namespace)
pvcs, err := pvcsToMap(ctx, i.k, i.namespace)
if err != nil {
return err
}
services, err := servicesToMap(ctx, k, namespace)
services, err := servicesToMap(ctx, i.k, i.namespace)
if err != nil {
return err
}
serviceAccounts, err := serviceAccountsToMap(ctx, k, namespace)
serviceAccounts, err := serviceAccountsToMap(ctx, i.k, i.namespace)
if err != nil {
return err
}
podDisruptionBudgets, err := podDisruptionBudgetsToMap(ctx, k, namespace)
podDisruptionBudgets, err := podDisruptionBudgetsToMap(ctx, i.k, i.namespace)
if err != nil {
return err
}
serviceMonitors, err := serviceMonitorsToMap(ctx, m, namespace)
serviceMonitors, err := serviceMonitorsToMap(ctx, i.m, i.namespace)
if err != nil {
return err
}
arangoMembers, err := arangoMembersToMap(ctx, c, namespace)
arangoMembers, err := arangoMembersToMap(ctx, i.c, i.namespace)
if err != nil {
return err
}

View file

@ -55,7 +55,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
core "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
@ -187,7 +187,7 @@ func createArangodArgs(cachedStatus interfaces.Inspector, input pod.Input, addit
}
// createArangoSyncArgs creates command line arguments for an arangosync server in the given group.
func createArangoSyncArgs(apiObject metav1.Object, spec api.DeploymentSpec, group api.ServerGroup, groupSpec api.ServerGroupSpec, member api.MemberStatus) []string {
func createArangoSyncArgs(apiObject meta.Object, spec api.DeploymentSpec, group api.ServerGroup, groupSpec api.ServerGroupSpec, member api.MemberStatus) []string {
options := k8sutil.CreateOptionPairs(64)
var runCmd string
var port int
@ -303,6 +303,45 @@ func (r *Resources) CreatePodTolerations(group api.ServerGroup, groupSpec api.Se
return tolerations
}
func (r *Resources) RenderPodTemplateForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.PodTemplateSpec, error) {
if p, err := r.RenderPodForMember(ctx, cachedStatus, spec, status, memberID, imageInfo); err != nil {
return nil, err
} else {
return &core.PodTemplateSpec{
ObjectMeta: p.ObjectMeta,
Spec: p.Spec,
}, nil
}
}
func (r *Resources) RenderPodTemplateForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.PodTemplateSpec, error) {
if p, err := r.RenderPodForMemberFromCurrent(ctx, cachedStatus, memberID); err != nil {
return nil, err
} else {
return &core.PodTemplateSpec{
ObjectMeta: p.ObjectMeta,
Spec: p.Spec,
}, nil
}
}
func (r *Resources) RenderPodForMemberFromCurrent(ctx context.Context, cachedStatus inspectorInterface.Inspector, memberID string) (*core.Pod, error) {
spec := r.context.GetSpec()
status, _ := r.context.GetStatus()
member, _, ok := status.Members.ElementByID(memberID)
if !ok {
return nil, errors.Newf("Member not found")
}
imageInfo, imageFound := r.SelectImageForMember(spec, status, member)
if !imageFound {
return nil, errors.Newf("ImageInfo not found")
}
return r.RenderPodForMember(ctx, cachedStatus, spec, status, member.ID, imageInfo)
}
func (r *Resources) RenderPodForMember(ctx context.Context, cachedStatus inspectorInterface.Inspector, spec api.DeploymentSpec, status api.DeploymentStatus, memberID string, imageInfo api.ImageInfo) (*core.Pod, error) {
log := r.log
apiObject := r.context.GetAPIObject()
@ -456,8 +495,16 @@ func (r *Resources) SelectImage(spec api.DeploymentSpec, status api.DeploymentSt
return imageInfo, true
}
func (r *Resources) SelectImageForMember(spec api.DeploymentSpec, status api.DeploymentStatus, member api.MemberStatus) (api.ImageInfo, bool) {
if member.Image != nil {
return *member.Image, true
}
return r.SelectImage(spec, status)
}
// createPodForMember creates all Pods listed in member status
func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentSpec, memberID string, imageNotFoundOnce *sync.Once, cachedStatus inspectorInterface.Inspector) error {
func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentSpec, member *api.ArangoMember, memberID string, imageNotFoundOnce *sync.Once) error {
log := r.log
status, lastVersion := r.context.GetStatus()
@ -470,6 +517,13 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
return nil
}
template := member.Status.Template
if template == nil {
// Template not yet propagated
return errors.Newf("Template not yet propagated")
}
if status.CurrentImage == nil {
status.CurrentImage = &imageInfo
}
@ -488,24 +542,6 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
kubecli := r.context.GetKubeCli()
apiObject := r.context.GetAPIObject()
endpoint, err := pod.GenerateMemberEndpoint(cachedStatus, apiObject, spec, group, m)
if err != nil {
return errors.WithStack(err)
}
if m.Endpoint == nil || *m.Endpoint != endpoint {
// Update endpoint
m.Endpoint = &endpoint
if err := status.Members.Update(m, group); err != nil {
return errors.WithStack(err)
}
}
pod, err := r.RenderPodForMember(ctx, cachedStatus, spec, status, memberID, imageInfo)
if err != nil {
return errors.WithStack(err)
}
ns := r.context.GetNamespace()
secrets := kubecli.CoreV1().Secrets(ns)
if !found {
@ -515,9 +551,8 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
// Update pod name
role := group.AsRole()
roleAbbr := group.AsRoleAbbreviated()
m.PodName = k8sutil.CreatePodName(apiObject.GetName(), roleAbbr, m.ID, CreatePodSuffix(spec))
m.PodName = template.PodSpec.GetName()
newPhase := api.MemberPhaseCreated
// Create pod
if group.IsArangod() {
@ -527,20 +562,15 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
newPhase = api.MemberPhaseUpgrading
}
sha, err := ChecksumArangoPod(groupSpec, pod)
if err != nil {
return errors.WithStack(err)
}
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
uid, err := CreateArangoPod(ctxChild, kubecli, apiObject, spec, group, pod)
uid, err := CreateArangoPod(ctxChild, kubecli, apiObject, spec, group, CreatePodFromTemplate(template.PodSpec))
if err != nil {
return errors.WithStack(err)
}
m.PodUID = uid
m.PodSpecVersion = sha
m.PodSpecVersion = template.PodSpecChecksum
m.ArangoVersion = m.Image.ArangoDBVersion
m.ImageID = m.Image.ImageID
@ -579,22 +609,16 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
}
}
sha, err := ChecksumArangoPod(groupSpec, pod)
if err != nil {
return errors.WithStack(err)
}
ctxChild, cancel := context.WithTimeout(ctx, k8sutil.GetRequestTimeout())
defer cancel()
uid, err := CreateArangoPod(ctxChild, kubecli, apiObject, spec, group, pod)
uid, err := CreateArangoPod(ctxChild, kubecli, apiObject, spec, group, CreatePodFromTemplate(template.PodSpec))
if err != nil {
return errors.WithStack(err)
}
log.Debug().Str("pod-name", m.PodName).Msg("Created pod")
m.PodUID = uid
m.Endpoint = &endpoint
m.PodSpecVersion = sha
m.PodSpecVersion = template.PodSpecChecksum
}
// Record new member phase
m.Phase = newPhase
@ -604,7 +628,10 @@ func (r *Resources) createPodForMember(ctx context.Context, spec api.DeploymentS
m.Conditions.Remove(api.ConditionTypeAgentRecoveryNeeded)
m.Conditions.Remove(api.ConditionTypeAutoUpgrade)
m.Conditions.Remove(api.ConditionTypeUpgradeFailed)
m.Conditions.Remove(api.ConditionTypePendingTLSRotation)
m.Conditions.Remove(api.ConditionTypePendingRestart)
m.Upgrade = false
r.log.Info().Str("DEBUG", "10101").Str("pod", m.PodName).Msgf("Updating member")
if err := status.Members.Update(m, group); err != nil {
return errors.WithStack(err)
}
@ -685,6 +712,13 @@ func CreateArangoPod(ctx context.Context, kubecli kubernetes.Interface, deployme
return uid, nil
}
func CreatePodFromTemplate(p *core.PodTemplateSpec) *core.Pod {
return &core.Pod{
ObjectMeta: p.ObjectMeta,
Spec: p.Spec,
}
}
func ChecksumArangoPod(groupSpec api.ServerGroupSpec, pod *core.Pod) (string, error) {
shaPod := pod.DeepCopy()
switch groupSpec.InitContainers.GetMode().Get() {
@ -708,7 +742,7 @@ func (r *Resources) EnsurePods(ctx context.Context, cachedStatus inspectorInterf
deploymentStatus, _ := r.context.GetStatus()
imageNotFoundOnce := &sync.Once{}
createPodMember := func(group api.ServerGroup, groupSpec api.ServerGroupSpec, status *api.MemberStatusList) error {
if err := iterator.ForeachServerGroup(func(group api.ServerGroup, groupSpec api.ServerGroupSpec, status *api.MemberStatusList) error {
for _, m := range *status {
if m.Phase != api.MemberPhasePending {
continue
@ -716,15 +750,29 @@ func (r *Resources) EnsurePods(ctx context.Context, cachedStatus inspectorInterf
if m.Conditions.IsTrue(api.ConditionTypeCleanedOut) {
continue
}
member, ok := cachedStatus.ArangoMember(m.ArangoMemberName(r.context.GetName(), group))
if !ok {
// ArangoMember not found, skip
continue
}
if member.Status.Template == nil {
r.log.Warn().Msgf("Missing Template")
// Template is missing, nothing to do
continue
}
r.log.Warn().Msgf("Ensuring pod")
spec := r.context.GetSpec()
if err := r.createPodForMember(ctx, spec, m.ID, imageNotFoundOnce, cachedStatus); err != nil {
if err := r.createPodForMember(ctx, spec, member, m.ID, imageNotFoundOnce); err != nil {
r.log.Warn().Err(err).Msgf("Ensuring pod failed")
return errors.WithStack(err)
}
}
return nil
}
if err := iterator.ForeachServerGroup(createPodMember, &deploymentStatus); err != nil {
}, &deploymentStatus); err != nil {
return errors.WithStack(err)
}

View file

@ -247,7 +247,7 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
if _, exists := cachedStatus.Pod(podName); !exists {
log.Debug().Str("pod-name", podName).Msg("Does not exist")
switch m.Phase {
case api.MemberPhaseNone:
case api.MemberPhaseNone, api.MemberPhasePending:
// Do nothing
log.Debug().Str("pod-name", podName).Msg("PodPhase is None, waiting for the pod to be recreated")
case api.MemberPhaseShuttingDown, api.MemberPhaseUpgrading, api.MemberPhaseFailed, api.MemberPhaseRotateStart, api.MemberPhaseRotating:

View file

@ -26,7 +26,6 @@ package inspector
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod"
@ -35,13 +34,10 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor"
monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1"
"k8s.io/client-go/kubernetes"
)
type Inspector interface {
Refresh(ctx context.Context, k kubernetes.Interface, m monitoringClient.MonitoringV1Interface,
c versioned.Interface, namespace string) error
Refresh(ctx context.Context) error
pod.Inspector
secret.Inspector