1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] ArangoClusterSynchronization controller (TG-190) (#876)

* [Feature] ArangoClusterSynchronization controller (TG-190)

* helm chart

* Fix model
This commit is contained in:
jwierzbo 2022-01-06 21:27:01 +01:00 committed by GitHub
parent 061d1d7cbb
commit b56c4a5631
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 503 additions and 131 deletions

View file

@ -13,6 +13,7 @@
- Add ArangoJob and Apps Operator
- Use Go 1.17
- Add metrics for the plan actions
- Add ArangoClusterSynchronization Operator
## [1.2.6](https://github.com/arangodb/kube-arangodb/tree/1.2.6) (2021-12-15)
- Add ArangoBackup backoff functionality

View file

@ -98,6 +98,7 @@ MANIFESTPATHDEPLOYMENT := manifests/arango-deployment$(MANIFESTSUFFIX).yaml
MANIFESTPATHDEPLOYMENTREPLICATION := manifests/arango-deployment-replication$(MANIFESTSUFFIX).yaml
MANIFESTPATHBACKUP := manifests/arango-backup$(MANIFESTSUFFIX).yaml
MANIFESTPATHAPPS := manifests/arango-apps$(MANIFESTSUFFIX).yaml
MANIFESTPATHK2KCLUSTERSYNC := manifests/arango-k2kclustersync$(MANIFESTSUFFIX).yaml
MANIFESTPATHSTORAGE := manifests/arango-storage$(MANIFESTSUFFIX).yaml
MANIFESTPATHALL := manifests/arango-all$(MANIFESTSUFFIX).yaml
MANIFESTPATHTEST := manifests/arango-test$(MANIFESTSUFFIX).yaml
@ -106,6 +107,7 @@ KUSTOMIZEPATHDEPLOYMENT := manifests/kustomize/deployment/arango-deployment$(MAN
KUSTOMIZEPATHDEPLOYMENTREPLICATION := manifests/kustomize/deployment-replication/arango-deployment-replication$(MANIFESTSUFFIX).yaml
KUSTOMIZEPATHBACKUP := manifests/kustomize/backup/arango-backup$(MANIFESTSUFFIX).yaml
KUSTOMIZEPATHAPPS := manifests/kustomize/apps/arango-apps$(MANIFESTSUFFIX).yaml
KUSTOMIZEPATHK2KCLUSTERSYNC := manifests/kustomize/apps/arango-k2kclustersync$(MANIFESTSUFFIX).yaml
KUSTOMIZEPATHSTORAGE := manifests/kustomize/storage/arango-storage$(MANIFESTSUFFIX).yaml
KUSTOMIZEPATHALL := manifests/kustomize/all/arango-all$(MANIFESTSUFFIX).yaml
KUSTOMIZEPATHTEST := manifests/kustomize/test/arango-test$(MANIFESTSUFFIX).yaml
@ -333,6 +335,7 @@ $(eval $(call manifest-generator, deployment, kube-arangodb, \
--set "operator.features.deploymentReplications=false" \
--set "operator.features.storage=false" \
--set "operator.features.apps=false" \
--set "operator.features.k8sToK8sClusterSync=false" \
--set "operator.features.backup=false"))
$(eval $(call manifest-generator, deployment-replication, kube-arangodb, \
@ -340,6 +343,7 @@ $(eval $(call manifest-generator, deployment-replication, kube-arangodb, \
--set "operator.features.deploymentReplications=true" \
--set "operator.features.storage=false" \
--set "operator.features.apps=false" \
--set "operator.features.k8sToK8sClusterSync=false" \
--set "operator.features.backup=false"))
$(eval $(call manifest-generator, storage, kube-arangodb, \
@ -347,6 +351,7 @@ $(eval $(call manifest-generator, storage, kube-arangodb, \
--set "operator.features.deploymentReplications=false" \
--set "operator.features.storage=true" \
--set "operator.features.apps=false" \
--set "operator.features.k8sToK8sClusterSync=false" \
--set "operator.features.backup=false"))
$(eval $(call manifest-generator, backup, kube-arangodb, \
@ -354,6 +359,7 @@ $(eval $(call manifest-generator, backup, kube-arangodb, \
--set "operator.features.deploymentReplications=false" \
--set "operator.features.storage=false" \
--set "operator.features.apps=false" \
--set "operator.features.k8sToK8sClusterSync=false" \
--set "operator.features.backup=true"))
$(eval $(call manifest-generator, apps, kube-arangodb, \
@ -361,6 +367,15 @@ $(eval $(call manifest-generator, apps, kube-arangodb, \
--set "operator.features.deploymentReplications=false" \
--set "operator.features.storage=false" \
--set "operator.features.apps=true" \
--set "operator.features.k8sToK8sClusterSync=false" \
--set "operator.features.backup=false"))
$(eval $(call manifest-generator, k2kclustersync, kube-arangodb, \
--set "operator.features.deployment=false" \
--set "operator.features.deploymentReplications=false" \
--set "operator.features.storage=false" \
--set "operator.features.apps=false" \
--set "operator.features.k8sToK8sClusterSync=true" \
--set "operator.features.backup=false"))
$(eval $(call manifest-generator, all, kube-arangodb, \
@ -368,6 +383,7 @@ $(eval $(call manifest-generator, all, kube-arangodb, \
--set "operator.features.deploymentReplications=true" \
--set "operator.features.storage=true" \
--set "operator.features.apps=true" \
--set "operator.features.k8sToK8sClusterSync=true" \
--set "operator.features.backup=true"))
.PHONY: chart-crd

View file

@ -0,0 +1,39 @@
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: arangoclustersynchronizations.database.arangodb.com
labels:
app.kubernetes.io/name: {{ template "kube-arangodb-crd.name" . }}
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
release: {{ .Release.Name }}
spec:
group: database.arangodb.com
names:
kind: ArangoClusterSynchronization
listKind: ArangoClusterSynchronizationList
plural: arangoclustersynchronizations
singular: arangoclustersynchronization
shortNames:
- arangoclustersync
scope: Namespaced
versions:
- name: v1
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
served: true
storage: true
subresources:
status: {}
- name: v2alpha1
schema:
openAPIV3Schema:
type: object
x-kubernetes-preserve-unknown-fields: true
served: true
storage: false
subresources:
status: {}

View file

@ -24,6 +24,9 @@ Possible Operators:
- `ArangoDeployment` - enabled by default
- `ArangoDeploymentReplications` - enabled by default
- `ArangoLocalStorage` - disabled by default
- `ArangoBackup` - disabled by default
- `ArangoJob` - disabled by default
- `ArangoClusterSynchronization` - disabled by default
To install Operators in mode "One per Helm Release" we can use:
@ -31,19 +34,10 @@ To install Operators in mode "One per Helm Release" we can use:
helm install --name arango-deployment kube-arangodb.tar.gz \
--set operator.features.deployment=true \
--set operator.features.deploymentReplications=false \
--set operator.features.storage=false
helm install --name arango-deployment-replications kube-arangodb.tar.gz \
--set operator.features.deployment=false \
--set operator.features.deploymentReplications=true \
--set operator.features.storage=false
helm install --name arango-storage kube-arangodb.tar.gz \
--set operator.features.deployment=false \
--set operator.features.deploymentReplications=false \
--set operator.features.storage=true
--set operator.features.storage=false \
--set operator.features.backup=false \
--set operator.features.apps=false \
--set operator.features.k8sToK8sClusterSync=false
```
@ -155,6 +149,18 @@ Define if ArangoBackup Operator should be enabled.
Default: `false`
### `operator.features.apps`
Define if ArangoJob Operator should be enabled.
Default: `false`
### `operator.features.k8sToK8sClusterSync`
Define if ArangoClusterSynchronization Operator should be enabled.
Default: `false`
### `rbac.enabled`
Define if RBAC should be enabled.

View file

@ -105,6 +105,9 @@ spec:
{{- end }}
{{ if .Values.operator.features.apps }}
- --operator.apps
{{- end }}
{{ if .Values.operator.features.k8sToK8sClusterSync }}
- --operator.k2k-cluster-sync
{{- end }}
- --chaos.allowed={{ .Values.operator.allowChaos }}
{{- if .Values.operator.args }}

View file

@ -0,0 +1,26 @@
{{ if .Values.rbac.enabled -}}
{{ if not (eq .Values.operator.scope "namespaced") -}}
{{ if .Values.operator.features.k8sToK8sClusterSync -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ template "kube-arangodb.rbac-cluster" . }}-k2kclustersync
labels:
app.kubernetes.io/name: {{ template "kube-arangodb.name" . }}
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
release: {{ .Release.Name }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ template "kube-arangodb.rbac-cluster" . }}-k2kclustersync
subjects:
- kind: ServiceAccount
name: {{ template "kube-arangodb.operatorName" . }}
namespace: {{ .Release.Namespace }}
{{- end }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,22 @@
{{ if .Values.rbac.enabled -}}
{{ if not (eq .Values.operator.scope "namespaced") -}}
{{ if .Values.operator.features.k8sToK8sClusterSync -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ template "kube-arangodb.rbac-cluster" . }}-k2kclustersync
labels:
app.kubernetes.io/name: {{ template "kube-arangodb.name" . }}
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
release: {{ .Release.Name }}
rules:
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["get", "list", "watch"]
{{- end }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,25 @@
{{ if .Values.rbac.enabled -}}
{{ if .Values.operator.features.k8sToK8sClusterSync -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: {{ template "kube-arangodb.rbac" . }}-k2kclustersync
namespace: {{ .Release.Namespace }}
labels:
app.kubernetes.io/name: {{ template "kube-arangodb.name" . }}
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
release: {{ .Release.Name }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: {{ template "kube-arangodb.rbac" . }}-k2kclustersync
subjects:
- kind: ServiceAccount
name: {{ template "kube-arangodb.operatorName" . }}
namespace: {{ .Release.Namespace }}
{{- end }}
{{- end }}

View file

@ -0,0 +1,33 @@
{{ if .Values.rbac.enabled -}}
{{ if .Values.operator.features.k8sToK8sClusterSync -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: {{ template "kube-arangodb.rbac" . }}-k2kclustersync
namespace: {{ .Release.Namespace }}
labels:
app.kubernetes.io/name: {{ template "kube-arangodb.name" . }}
helm.sh/chart: {{ .Chart.Name }}-{{ .Chart.Version }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
app.kubernetes.io/instance: {{ .Release.Name }}
release: {{ .Release.Name }}
rules:
- apiGroups: [""]
resources: ["pods", "services", "endpoints"]
verbs: ["get", "update"]
- apiGroups: [""]
resources: ["events"]
verbs: ["*"]
- apiGroups: [""]
resources: ["secrets"]
verbs: ["get"]
- apiGroups: ["apps"]
resources: ["deployments", "replicasets"]
verbs: ["get"]
- apiGroups: ["database.arangodb.com"]
resources: ["arangodeployments", "arangoclustersynchronizations"]
verbs: ["get", "list", "watch"]
{{- end }}
{{- end }}

View file

@ -39,6 +39,7 @@ operator:
storage: false
backup: false
apps: false
k8sToK8sClusterSync: false
images:
base: alpine:3.11

View file

@ -113,6 +113,7 @@ var (
enableBackup bool // Run backup operator
enableApps bool // Run apps operator
versionOnly bool // Run only version endpoint, explicitly disabled with other
enableK2KClusterSync bool // Run k2kClusterSync operator
scalingIntegrationEnabled bool
@ -141,6 +142,7 @@ var (
storageProbe probe.ReadyProbe
backupProbe probe.ReadyProbe
appsProbe probe.ReadyProbe
k2KClusterSyncProbe probe.ReadyProbe
)
func init() {
@ -157,6 +159,7 @@ func init() {
f.BoolVar(&operatorOptions.enableStorage, "operator.storage", false, "Enable to run the ArangoLocalStorage operator")
f.BoolVar(&operatorOptions.enableBackup, "operator.backup", false, "Enable to run the ArangoBackup operator")
f.BoolVar(&operatorOptions.enableApps, "operator.apps", false, "Enable to run the ArangoApps operator")
f.BoolVar(&operatorOptions.enableK2KClusterSync, "operator.k2k-cluster-sync", false, "Enable to run the ArangoClusterSynchronizations operator")
f.BoolVar(&operatorOptions.versionOnly, "operator.version", false, "Enable only version endpoint in Operator")
f.StringVar(&operatorOptions.alpineImage, "operator.alpine-image", UBIImageEnv.GetOrDefault(defaultAlpineImage), "Docker image used for alpine containers")
f.MarkDeprecated("operator.alpine-image", "Value is not used anymore")
@ -225,12 +228,13 @@ func executeMain(cmd *cobra.Command, args []string) {
klog.Flush()
// Check operating mode
if !operatorOptions.enableDeployment && !operatorOptions.enableDeploymentReplication && !operatorOptions.enableStorage && !operatorOptions.enableBackup && !operatorOptions.enableApps {
if !operatorOptions.enableDeployment && !operatorOptions.enableDeploymentReplication && !operatorOptions.enableStorage &&
!operatorOptions.enableBackup && !operatorOptions.enableApps && !operatorOptions.enableK2KClusterSync {
if !operatorOptions.versionOnly {
cliLog.Fatal().Err(err).Msg("Turn on --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup, --operator.apps or any combination of these")
cliLog.Fatal().Err(err).Msg("Turn on --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup, --operator.apps, --operator.k2k-cluster-sync or any combination of these")
}
} else if operatorOptions.versionOnly {
cliLog.Fatal().Err(err).Msg("Options --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup, --operator.apps cannot be enabled together with --operator.version")
cliLog.Fatal().Err(err).Msg("Options --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup, --operator.apps, --operator.k2k-cluster-sync cannot be enabled together with --operator.version")
}
// Log version
@ -307,6 +311,10 @@ func executeMain(cmd *cobra.Command, args []string) {
Enabled: cfg.EnableApps,
Probe: &appsProbe,
},
ClusterSync: server.OperatorDependency{
Enabled: cfg.EnableK2KClusterSync,
Probe: &k2KClusterSyncProbe,
},
Operators: o,
Secrets: secrets,
@ -394,6 +402,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper
EnableStorage: operatorOptions.enableStorage,
EnableBackup: operatorOptions.enableBackup,
EnableApps: operatorOptions.enableApps,
EnableK2KClusterSync: operatorOptions.enableK2KClusterSync,
AllowChaos: chaosOptions.allowed,
ScalingIntegrationEnabled: operatorOptions.scalingIntegrationEnabled,
ArangoImage: operatorOptions.arangoImage,
@ -413,6 +422,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper
StorageProbe: &storageProbe,
BackupProbe: &backupProbe,
AppsProbe: &appsProbe,
K2KClusterSyncProbe: &k2KClusterSyncProbe,
}
return cfg, deps, nil

View file

@ -0,0 +1,5 @@
apiVersion: database.arangodb.com/v1
kind: ArangoClusterSynchronization
metadata:
name: arangoclustersync-sample
spec:

View file

@ -0,0 +1,5 @@
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
resources:
- arango-k2kclustersync.yaml

View file

@ -39,5 +39,6 @@ const (
)
var (
ArangoDeploymentShortNames = []string{"arangodb", "arango"}
ArangoDeploymentShortNames = []string{"arangodb", "arango"}
ArangoClusterSynchronizationShortNames = []string{"arangoclustersync"}
)

View file

@ -52,6 +52,8 @@ func addKnownTypes(s *runtime.Scheme) error {
&ArangoDeploymentList{},
&ArangoMember{},
&ArangoMemberList{},
&ArangoClusterSynchronization{},
&ArangoClusterSynchronizationList{},
)
metav1.AddToGroupVersion(s, SchemeGroupVersion)
return nil

View file

@ -52,6 +52,8 @@ func addKnownTypes(s *runtime.Scheme) error {
&ArangoDeploymentList{},
&ArangoMember{},
&ArangoMemberList{},
&ArangoClusterSynchronization{},
&ArangoClusterSynchronizationList{},
)
metav1.AddToGroupVersion(s, SchemeGroupVersion)
return nil

View file

@ -0,0 +1,80 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clustersync
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/apis/apps"
appsApi "github.com/arangodb/kube-arangodb/pkg/apis/apps/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/operation"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
type handler struct {
client arangoClientSet.Interface
kubeClient kubernetes.Interface
eventRecorder event.RecorderInstance
operator operator.Operator
}
func (*handler) Name() string {
return deployment.ArangoClusterSynchronizationResourceKind
}
func (h *handler) Handle(item operation.Item) error {
// Do not act on delete event
if item.Operation == operation.Delete {
return nil
}
// Get ClusterSynchronizations object. It also covers NotFound case
clusterSync, err := h.client.DatabaseV1().ArangoClusterSynchronizations(item.Namespace).Get(context.Background(), item.Name, meta.GetOptions{})
if err != nil {
if k8sutil.IsNotFound(err) {
return nil
}
h.operator.GetLogger().Error().Msgf("ArangoClusterSynchronizations fetch error %v", err)
return err
}
// Update status on object
if _, err = h.client.DatabaseV1().ArangoClusterSynchronizations(item.Namespace).UpdateStatus(context.Background(), clusterSync, meta.UpdateOptions{}); err != nil {
h.operator.GetLogger().Error().Msgf("ArangoClusterSynchronizations status update error %v", err)
return err
}
return nil
}
func (*handler) CanBeHandled(item operation.Item) bool {
return item.Group == appsApi.SchemeGroupVersion.Group &&
item.Version == appsApi.SchemeGroupVersion.Version &&
item.Kind == apps.ArangoJobResourceKind
}

View file

@ -0,0 +1,57 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clustersync
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/rs/zerolog/log"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var _ operator.LifecyclePreStart = &handler{}
// LifecyclePreStart is executed before operator starts to work, additional checks can be placed here
// Wait for CR to be present
func (h *handler) LifecyclePreStart() error {
log.Info().Msgf("Starting Lifecycle PreStart for %s", h.Name())
defer func() {
log.Info().Msgf("Lifecycle PreStart for %s completed", h.Name())
}()
for {
_, err := h.client.DatabaseV1().ArangoClusterSynchronizations(h.operator.Namespace()).List(context.Background(), meta.ListOptions{})
if err != nil {
log.Warn().Err(err).Msgf("CR for %s not found", deployment.ArangoClusterSynchronizationResourceKind)
time.Sleep(250 * time.Millisecond)
continue
}
return nil
}
}

View file

@ -0,0 +1,62 @@
//
// DISCLAIMER
//
// Copyright 2016-2021 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clustersync
import (
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
v1 "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
arangoInformer "github.com/arangodb/kube-arangodb/pkg/generated/informers/externalversions"
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
"k8s.io/client-go/kubernetes"
)
func newEventInstance(eventRecorder event.Recorder) event.RecorderInstance {
return eventRecorder.NewInstance(v1.SchemeGroupVersion.Group,
v1.SchemeGroupVersion.Version,
deployment.ArangoClusterSynchronizationResourceKind)
}
// RegisterInformer into operator
func RegisterInformer(operator operator.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory) error {
if err := operator.RegisterInformer(informer.Database().V1().ArangoClusterSynchronizations().Informer(),
v1.SchemeGroupVersion.Group,
v1.SchemeGroupVersion.Version,
deployment.ArangoClusterSynchronizationResourceKind); err != nil {
return err
}
h := &handler{
client: client,
kubeClient: kubeClient,
eventRecorder: newEventInstance(recorder),
operator: operator,
}
if err := operator.RegisterHandler(h); err != nil {
return err
}
return nil
}

View file

@ -27,8 +27,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/apis/apps"
appsApi "github.com/arangodb/kube-arangodb/pkg/apis/apps/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
database "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
deploymentApi "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v2alpha1"
deploymentApi "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
fakeClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/fake"
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
@ -95,7 +94,7 @@ func createK8sJob(t *testing.T, h *handler, jobs ...*batchv1.Job) {
}
}
func createArangoDeployment(t *testing.T, h *handler, deployments ...*database.ArangoDeployment) {
func createArangoDeployment(t *testing.T, h *handler, deployments ...*deploymentApi.ArangoDeployment) {
for _, deployment := range deployments {
_, err := h.client.DatabaseV1().ArangoDeployments(deployment.Namespace).Create(context.Background(), deployment, meta.CreateOptions{})
require.NoError(t, err)
@ -133,8 +132,8 @@ func newArangoJob(name, namespace, deployment string) *appsApi.ArangoJob {
}
}
func newArangoDeployment(name, namespace string) *database.ArangoDeployment {
return &database.ArangoDeployment{
func newArangoDeployment(name, namespace string) *deploymentApi.ArangoDeployment {
return &deploymentApi.ArangoDeployment{
TypeMeta: meta.TypeMeta{
APIVersion: deploymentApi.SchemeGroupVersion.String(),
Kind: deployment.ArangoDeploymentResourceKind,

View file

@ -17,89 +17,38 @@
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package operator
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/apis/backup"
"github.com/arangodb/kube-arangodb/pkg/apis/deployment"
"github.com/arangodb/kube-arangodb/pkg/apis/replication"
lsapi "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/crd"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// waitForCRD waits for the CustomResourceDefinition (created externally)
// to be ready.
func (o *Operator) waitForCRD(enableDeployment, enableDeploymentReplication, enableStorage, enableBackup bool) error {
// waitForCRD waits for the CustomResourceDefinition (created externally) to be ready.
func (o *Operator) waitForCRD(crdName string, checkFn func() error) {
log := o.log
log.Debug().Msgf("Waiting for %s CRD to be ready - ", crdName)
if o.Scope.IsNamespaced() {
if enableDeployment {
log.Debug().Msg("Waiting for ArangoDeployment CRD to be ready")
if err := crd.WaitReady(func() error {
_, err := o.CRCli.DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}); err != nil {
return errors.WithStack(err)
for {
var err error = nil
if o.Scope.IsNamespaced() {
if checkFn != nil {
err = crd.WaitReady(checkFn)
}
} else {
err = crd.WaitCRDReady(o.KubeExtCli, crdName)
}
if enableDeploymentReplication {
log.Debug().Msg("Waiting for ArangoDeploymentReplication CRD to be ready")
if err := crd.WaitReady(func() error {
_, err := o.CRCli.ReplicationV1().ArangoDeploymentReplications(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}); err != nil {
return errors.WithStack(err)
}
}
if enableBackup {
log.Debug().Msg("Wait for ArangoBackup CRD to be ready")
if err := crd.WaitReady(func() error {
_, err := o.CRCli.BackupV1().ArangoBackups(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}); err != nil {
return errors.WithStack(err)
}
}
} else {
if enableDeployment {
log.Debug().Msg("Waiting for ArangoDeployment CRD to be ready")
if err := crd.WaitCRDReady(o.KubeExtCli, deployment.ArangoDeploymentCRDName); err != nil {
return errors.WithStack(err)
}
}
if enableDeploymentReplication {
log.Debug().Msg("Waiting for ArangoDeploymentReplication CRD to be ready")
if err := crd.WaitCRDReady(o.KubeExtCli, replication.ArangoDeploymentReplicationCRDName); err != nil {
return errors.WithStack(err)
}
}
if enableStorage {
log.Debug().Msg("Waiting for ArangoLocalStorage CRD to be ready")
if err := crd.WaitCRDReady(o.KubeExtCli, lsapi.ArangoLocalStorageCRDName); err != nil {
return errors.WithStack(err)
}
}
if enableBackup {
log.Debug().Msg("Wait for ArangoBackup CRD to be ready")
if err := crd.WaitCRDReady(o.KubeExtCli, backup.ArangoBackupCRDName); err != nil {
return errors.WithStack(err)
}
if err == nil {
break
} else {
log.Error().Err(err).Msg("Resource initialization failed")
log.Info().Msgf("Retrying in %s...", initRetryWaitTime)
time.Sleep(initRetryWaitTime)
}
}
log.Debug().Msg("CRDs ready")
return nil
}

View file

@ -31,14 +31,18 @@ import (
monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"github.com/arangodb/kube-arangodb/pkg/apis/apps"
backupdef "github.com/arangodb/kube-arangodb/pkg/apis/backup"
depldef "github.com/arangodb/kube-arangodb/pkg/apis/deployment"
deplapi "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
repldef "github.com/arangodb/kube-arangodb/pkg/apis/replication"
replapi "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1"
lsapi "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/deployment"
@ -46,6 +50,7 @@ import (
arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
arangoInformer "github.com/arangodb/kube-arangodb/pkg/generated/informers/externalversions"
"github.com/arangodb/kube-arangodb/pkg/handlers/backup"
"github.com/arangodb/kube-arangodb/pkg/handlers/clustersync"
"github.com/arangodb/kube-arangodb/pkg/handlers/job"
"github.com/arangodb/kube-arangodb/pkg/handlers/policy"
"github.com/arangodb/kube-arangodb/pkg/logging"
@ -65,8 +70,9 @@ const (
type operatorV2type string
const (
backupOperator operatorV2type = "backup"
appsOperator operatorV2type = "apps"
backupOperator operatorV2type = "backup"
appsOperator operatorV2type = "apps"
k2KClusterSyncOperator operatorV2type = "k2kclustersync"
)
type Event struct {
@ -98,6 +104,7 @@ type Config struct {
EnableStorage bool
EnableBackup bool
EnableApps bool
EnableK2KClusterSync bool
AllowChaos bool
ScalingIntegrationEnabled bool
SingleMode bool
@ -117,6 +124,7 @@ type Dependencies struct {
StorageProbe *probe.ReadyProbe
BackupProbe *probe.ReadyProbe
AppsProbe *probe.ReadyProbe
K2KClusterSyncProbe *probe.ReadyProbe
}
// NewOperator instantiates a new operator from given config & dependencies.
@ -169,49 +177,42 @@ func (o *Operator) Run() {
go o.runWithoutLeaderElection("arango-apps-operator", constants.AppsLabelRole, o.onStartApps, o.Dependencies.AppsProbe)
}
}
if o.Config.EnableK2KClusterSync {
if !o.Config.SingleMode {
go o.runLeaderElection("arango-k2k-cluster-sync-operator", constants.ClusterSyncLabelRole, o.onStartK2KClusterSync, o.Dependencies.K2KClusterSyncProbe)
} else {
go o.runWithoutLeaderElection("arango-k2k-cluster-sync-operator", constants.ClusterSyncLabelRole, o.onStartK2KClusterSync, o.Dependencies.K2KClusterSyncProbe)
}
}
// Wait until process terminates
<-context.TODO().Done()
}
// onStartDeployment starts the deployment operator and run till given channel is closed.
func (o *Operator) onStartDeployment(stop <-chan struct{}) {
for {
if err := o.waitForCRD(true, false, false, false); err == nil {
break
} else {
log.Error().Err(err).Msg("Resource initialization failed")
log.Info().Msgf("Retrying in %s...", initRetryWaitTime)
time.Sleep(initRetryWaitTime)
}
checkFn := func() error {
_, err := o.CRCli.DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}
o.waitForCRD(depldef.ArangoDeploymentCRDName, checkFn)
o.runDeployments(stop)
}
// onStartDeploymentReplication starts the deployment replication operator and run till given channel is closed.
func (o *Operator) onStartDeploymentReplication(stop <-chan struct{}) {
for {
if err := o.waitForCRD(false, true, false, false); err == nil {
break
} else {
log.Error().Err(err).Msg("Resource initialization failed")
log.Info().Msgf("Retrying in %s...", initRetryWaitTime)
time.Sleep(initRetryWaitTime)
}
checkFn := func() error {
_, err := o.CRCli.DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}
o.waitForCRD(repldef.ArangoDeploymentReplicationCRDName, checkFn)
o.runDeploymentReplications(stop)
}
// onStartStorage starts the storage operator and run till given channel is closed.
func (o *Operator) onStartStorage(stop <-chan struct{}) {
for {
if err := o.waitForCRD(false, false, true, false); err == nil {
break
} else {
log.Error().Err(err).Msg("Resource initialization failed")
log.Info().Msgf("Retrying in %s...", initRetryWaitTime)
time.Sleep(initRetryWaitTime)
}
}
o.waitForCRD(lsapi.ArangoLocalStorageCRDName, nil)
o.runDeploymentReplications(stop)
o.runLocalStorages(stop)
}
@ -225,17 +226,13 @@ func (o *Operator) onStartApps(stop <-chan struct{}) {
o.onStartOperatorV2(appsOperator, stop)
}
// onStartK2KClusterSync starts the operator and run till given channel is closed.
func (o *Operator) onStartK2KClusterSync(stop <-chan struct{}) {
o.onStartOperatorV2(k2KClusterSyncOperator, stop)
}
// onStartOperatorV2 run the operatorV2 type
func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan struct{}) {
for {
if err := o.waitForCRD(false, false, false, true); err == nil {
break
} else {
log.Error().Err(err).Msg("Resource initialization failed")
log.Info().Msgf("Retrying in %s...", initRetryWaitTime)
time.Sleep(initRetryWaitTime)
}
}
operatorName := fmt.Sprintf("arangodb-%s-operator", operatorType)
operator := operatorV2.NewOperator(o.Dependencies.LogService.MustGetLogger(logging.LoggerNameReconciliation), operatorName, o.Namespace, o.OperatorImage)
@ -264,16 +261,45 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st
switch operatorType {
case appsOperator:
checkFn := func() error {
_, err := o.CRCli.AppsV1().ArangoJobs(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}
o.waitForCRD(apps.ArangoJobCRDName, checkFn)
if err = job.RegisterInformer(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer); err != nil {
panic(err)
}
case backupOperator:
checkFn := func() error {
_, err := o.CRCli.BackupV1().ArangoBackups(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}
o.waitForCRD(backupdef.ArangoBackupCRDName, checkFn)
if err = backup.RegisterInformer(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer); err != nil {
panic(err)
}
checkFn = func() error {
_, err := o.CRCli.BackupV1().ArangoBackupPolicies(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}
o.waitForCRD(backupdef.ArangoBackupPolicyCRDName, checkFn)
if err = policy.RegisterInformer(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer); err != nil {
panic(err)
}
case k2KClusterSyncOperator:
checkFn := func() error {
_, err := o.CRCli.DatabaseV1().ArangoClusterSynchronizations(o.Namespace).List(context.Background(), meta.ListOptions{})
return err
}
o.waitForCRD(depldef.ArangoClusterSynchronizationCRDName, checkFn)
if err = clustersync.RegisterInformer(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer); err != nil {
panic(err)
}
}
if err = operator.RegisterStarter(arangoInformer); err != nil {

View file

@ -73,6 +73,7 @@ type Dependencies struct {
Storage OperatorDependency
Backup OperatorDependency
Apps OperatorDependency
ClusterSync OperatorDependency
Operators Operators
Secrets corev1.SecretInterface
}

View file

@ -59,8 +59,9 @@ const (
AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false"
BackupLabelRole = "backup/role"
AppsLabelRole = "apps/role"
LabelRole = "role"
LabelRoleLeader = "leader"
BackupLabelRole = "backup/role"
AppsLabelRole = "apps/role"
ClusterSyncLabelRole = "clustersync/role"
LabelRole = "role"
LabelRoleLeader = "leader"
)