From 413a07961d5bdffed8894d9fa3100fc3997b4a36 Mon Sep 17 00:00:00 2001 From: Ewout Prangsma Date: Fri, 9 Feb 2018 17:21:06 +0100 Subject: [PATCH] Adding controller code --- Makefile | 2 +- main.go | 4 + pkg/apis/arangodb/v1alpha/cluster_spec.go | 6 +- pkg/controller/controller.go | 112 +++++++++++++++++++--- pkg/controller/crd.go | 54 +++++++++++ pkg/util/k8sutil/crd.go | 2 +- 6 files changed, 162 insertions(+), 18 deletions(-) create mode 100644 pkg/controller/crd.go diff --git a/Makefile b/Makefile index f45d47219..8f711e1d9 100644 --- a/Makefile +++ b/Makefile @@ -116,7 +116,7 @@ $(BIN): $(GOBUILDDIR) $(SOURCES) -e CGO_ENABLED=0 \ -w /usr/code/ \ golang:$(GOVERSION) \ - go build -a -installsuffix cgo -ldflags "-X main.projectVersion=$(VERSION) -X main.projectBuild=$(COMMIT)" -o /usr/code/bin/$(BINNAME) $(REPOPATH) + go build -installsuffix cgo -ldflags "-X main.projectVersion=$(VERSION) -X main.projectBuild=$(COMMIT)" -o /usr/code/bin/$(BINNAME) $(REPOPATH) docker: $(BIN) docker build -f $(DOCKERFILE) -t arangodb/arangodb-operator . diff --git a/main.go b/main.go index cfc82b79b..00fc2373a 100644 --- a/main.go +++ b/main.go @@ -92,6 +92,9 @@ func cmdMainRun(cmd *cobra.Command, args []string) { cliLog.Fatal().Err(err).Msg("Failed to initialize log service") } + // Log version + cliLog.Info().Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild) + // Get environment namespace := os.Getenv(constants.EnvOperatorPodNamespace) if len(namespace) == 0 { @@ -196,6 +199,7 @@ func newControllerConfigAndDeps(namespace, name string) (controller.Config, cont CreateCRD: createCRD, } deps := controller.Dependencies{ + Log: logService.MustGetLogger("controller"), KubeCli: kubecli, KubeExtCli: kubeExtCli, ClusterCRCli: clusterCRCli, diff --git a/pkg/apis/arangodb/v1alpha/cluster_spec.go b/pkg/apis/arangodb/v1alpha/cluster_spec.go index 96d9aa1cf..20259c8fd 100644 --- a/pkg/apis/arangodb/v1alpha/cluster_spec.go +++ b/pkg/apis/arangodb/v1alpha/cluster_spec.go @@ -28,9 +28,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object - // ArangoClusterList is a list of ArangoDB clusters. +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object type ArangoClusterList struct { metav1.TypeMeta `json:",inline"` // Standard list metadata @@ -39,10 +38,9 @@ type ArangoClusterList struct { Items []ArangoCluster `json:"items"` } +// ArangoCluster contains the entire Kubernetes info for an ArangoDB cluster // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object - -// ArangoCluster contains the entire Kubernetes info for an ArangoDB cluster type ArangoCluster struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 728dcd67c..f10ea1e3a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -23,19 +23,26 @@ package controller import ( + "context" "fmt" + "time" "github.com/pkg/errors" "github.com/rs/zerolog" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/fields" kwatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha" "github.com/arangodb/k8s-operator/pkg/cluster" "github.com/arangodb/k8s-operator/pkg/generated/clientset/versioned" "github.com/arangodb/k8s-operator/pkg/metrics" - "github.com/arangodb/k8s-operator/pkg/util/k8sutil" +) + +const ( + initRetryWaitTime = 30 * time.Second ) var ( @@ -83,7 +90,98 @@ func NewController(config Config, deps Dependencies) (*Controller, error) { // Start the controller func (c *Controller) Start() error { - return nil + log := c.Dependencies.Log + + for { + if err := c.initResourceIfNeeded(); err == nil { + break + } else { + log.Error().Err(err).Msg("Resource initialization failed") + log.Info().Msgf("Retrying in %s...", initRetryWaitTime) + time.Sleep(initRetryWaitTime) + } + } + + //probe.SetReady() + c.run() + panic("unreachable") +} + +// run the controller. +// This registers a listener and waits until the process stops. +func (c *Controller) run() { + source := cache.NewListWatchFromClient( + c.Dependencies.ClusterCRCli.ClusterV1alpha().RESTClient(), + api.ArangoClusterResourcePlural, + c.Config.Namespace, + fields.Everything()) + + _, informer := cache.NewIndexerInformer(source, &api.ArangoCluster{}, 0, cache.ResourceEventHandlerFuncs{ + AddFunc: c.onAddArangoCluster, + UpdateFunc: c.onUpdateArangoCluster, + DeleteFunc: c.onDeleteArangoCluster, + }, cache.Indexers{}) + + ctx := context.TODO() + // TODO: use workqueue to avoid blocking + informer.Run(ctx.Done()) +} + +// onAddArangoCluster cluster addition callback +func (c *Controller) onAddArangoCluster(obj interface{}) { + c.syncArangoCluster(obj.(*api.ArangoCluster)) +} + +// onUpdateArangoCluster cluster update callback +func (c *Controller) onUpdateArangoCluster(oldObj, newObj interface{}) { + c.syncArangoCluster(newObj.(*api.ArangoCluster)) +} + +// onDeleteArangoCluster cluster delete callback +func (c *Controller) onDeleteArangoCluster(obj interface{}) { + clus, ok := obj.(*api.ArangoCluster) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + panic(fmt.Sprintf("unknown object from EtcdCluster delete event: %#v", obj)) + } + clus, ok = tombstone.Obj.(*api.ArangoCluster) + if !ok { + panic(fmt.Sprintf("Tombstone contained object that is not an ArangoCluster: %#v", obj)) + } + } + ev := &Event{ + Type: kwatch.Deleted, + Object: clus, + } + + // pt.start() + err := c.handleClusterEvent(ev) + if err != nil { + c.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event") + } + //pt.stop() +} + +// syncArangoCluster synchronized the given cluster. +func (c *Controller) syncArangoCluster(apiCluster *api.ArangoCluster) { + ev := &Event{ + Type: kwatch.Added, + Object: apiCluster, + } + // re-watch or restart could give ADD event. + // If for an ADD event the cluster spec is invalid then it is not added to the local cache + // so modifying that cluster will result in another ADD event + if _, ok := c.clusters[apiCluster.Name]; ok { + ev.Type = kwatch.Modified + } + + //pt.start() + err := c.handleClusterEvent(ev) + if err != nil { + c.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event") + } + //pt.stop() } // handleClusterEvent processed the given event. @@ -153,13 +251,3 @@ func (c *Controller) makeClusterConfigAndDeps() (cluster.Config, cluster.Depende } return cfg, deps } - -func (c *Controller) initCRD() error { - if err := k8sutil.CreateCRD(c.KubeExtCli, api.ArangoClusterCRDName, api.ArangoClusterResourceKind, api.ArangoClusterResourcePlural, "arangodb"); err != nil { - return maskAny(errors.Wrapf(err, "failed to create CRD: %v", err)) - } - if err := k8sutil.WaitCRDReady(c.KubeExtCli, api.ArangoClusterCRDName); err != nil { - return maskAny(err) - } - return nil -} diff --git a/pkg/controller/crd.go b/pkg/controller/crd.go new file mode 100644 index 000000000..b7838ef32 --- /dev/null +++ b/pkg/controller/crd.go @@ -0,0 +1,54 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package controller + +import ( + "fmt" + + "github.com/pkg/errors" + + api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha" + "github.com/arangodb/k8s-operator/pkg/util/k8sutil" +) + +// initResourceIfNeeded initializes the custom resource definition when +// instructed to do so by the config. +func (c *Controller) initResourceIfNeeded() error { + if c.Config.CreateCRD { + if err := c.initCRD(); err != nil { + return maskAny(fmt.Errorf("Failed to initialize Custom Resource Definition: %v", err)) + } + } + return nil +} + +// initCRD creates the CustomResourceDefinition and waits for it to be ready. +func (c *Controller) initCRD() error { + if err := k8sutil.CreateCRD(c.KubeExtCli, api.ArangoClusterCRDName, api.ArangoClusterResourceKind, api.ArangoClusterResourcePlural, "arangodb"); err != nil { + return maskAny(errors.Wrapf(err, "failed to create CRD: %v", err)) + } + if err := k8sutil.WaitCRDReady(c.KubeExtCli, api.ArangoClusterCRDName); err != nil { + return maskAny(err) + } + return nil +} diff --git a/pkg/util/k8sutil/crd.go b/pkg/util/k8sutil/crd.go index d35cade86..0f4761680 100644 --- a/pkg/util/k8sutil/crd.go +++ b/pkg/util/k8sutil/crd.go @@ -29,8 +29,8 @@ import ( apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/tools/clientcmd/api" + api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha" "github.com/arangodb/k8s-operator/pkg/util/retry" )