mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Adding controller code
This commit is contained in:
parent
d3359f79cf
commit
413a07961d
6 changed files with 162 additions and 18 deletions
2
Makefile
2
Makefile
|
@ -116,7 +116,7 @@ $(BIN): $(GOBUILDDIR) $(SOURCES)
|
|||
-e CGO_ENABLED=0 \
|
||||
-w /usr/code/ \
|
||||
golang:$(GOVERSION) \
|
||||
go build -a -installsuffix cgo -ldflags "-X main.projectVersion=$(VERSION) -X main.projectBuild=$(COMMIT)" -o /usr/code/bin/$(BINNAME) $(REPOPATH)
|
||||
go build -installsuffix cgo -ldflags "-X main.projectVersion=$(VERSION) -X main.projectBuild=$(COMMIT)" -o /usr/code/bin/$(BINNAME) $(REPOPATH)
|
||||
|
||||
docker: $(BIN)
|
||||
docker build -f $(DOCKERFILE) -t arangodb/arangodb-operator .
|
||||
|
|
4
main.go
4
main.go
|
@ -92,6 +92,9 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
|
|||
cliLog.Fatal().Err(err).Msg("Failed to initialize log service")
|
||||
}
|
||||
|
||||
// Log version
|
||||
cliLog.Info().Msgf("Starting arangodb-operator, version %s build %s", projectVersion, projectBuild)
|
||||
|
||||
// Get environment
|
||||
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
|
||||
if len(namespace) == 0 {
|
||||
|
@ -196,6 +199,7 @@ func newControllerConfigAndDeps(namespace, name string) (controller.Config, cont
|
|||
CreateCRD: createCRD,
|
||||
}
|
||||
deps := controller.Dependencies{
|
||||
Log: logService.MustGetLogger("controller"),
|
||||
KubeCli: kubecli,
|
||||
KubeExtCli: kubeExtCli,
|
||||
ClusterCRCli: clusterCRCli,
|
||||
|
|
|
@ -28,9 +28,8 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
// ArangoClusterList is a list of ArangoDB clusters.
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
type ArangoClusterList struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
// Standard list metadata
|
||||
|
@ -39,10 +38,9 @@ type ArangoClusterList struct {
|
|||
Items []ArangoCluster `json:"items"`
|
||||
}
|
||||
|
||||
// ArangoCluster contains the entire Kubernetes info for an ArangoDB cluster
|
||||
// +genclient
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
// ArangoCluster contains the entire Kubernetes info for an ArangoDB cluster
|
||||
type ArangoCluster struct {
|
||||
metav1.TypeMeta `json:",inline"`
|
||||
metav1.ObjectMeta `json:"metadata,omitempty"`
|
||||
|
|
|
@ -23,19 +23,26 @@
|
|||
package controller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog"
|
||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
kwatch "k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
|
||||
"github.com/arangodb/k8s-operator/pkg/cluster"
|
||||
"github.com/arangodb/k8s-operator/pkg/generated/clientset/versioned"
|
||||
"github.com/arangodb/k8s-operator/pkg/metrics"
|
||||
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
const (
|
||||
initRetryWaitTime = 30 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -83,7 +90,98 @@ func NewController(config Config, deps Dependencies) (*Controller, error) {
|
|||
|
||||
// Start the controller
|
||||
func (c *Controller) Start() error {
|
||||
return nil
|
||||
log := c.Dependencies.Log
|
||||
|
||||
for {
|
||||
if err := c.initResourceIfNeeded(); err == nil {
|
||||
break
|
||||
} else {
|
||||
log.Error().Err(err).Msg("Resource initialization failed")
|
||||
log.Info().Msgf("Retrying in %s...", initRetryWaitTime)
|
||||
time.Sleep(initRetryWaitTime)
|
||||
}
|
||||
}
|
||||
|
||||
//probe.SetReady()
|
||||
c.run()
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
// run the controller.
|
||||
// This registers a listener and waits until the process stops.
|
||||
func (c *Controller) run() {
|
||||
source := cache.NewListWatchFromClient(
|
||||
c.Dependencies.ClusterCRCli.ClusterV1alpha().RESTClient(),
|
||||
api.ArangoClusterResourcePlural,
|
||||
c.Config.Namespace,
|
||||
fields.Everything())
|
||||
|
||||
_, informer := cache.NewIndexerInformer(source, &api.ArangoCluster{}, 0, cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.onAddArangoCluster,
|
||||
UpdateFunc: c.onUpdateArangoCluster,
|
||||
DeleteFunc: c.onDeleteArangoCluster,
|
||||
}, cache.Indexers{})
|
||||
|
||||
ctx := context.TODO()
|
||||
// TODO: use workqueue to avoid blocking
|
||||
informer.Run(ctx.Done())
|
||||
}
|
||||
|
||||
// onAddArangoCluster cluster addition callback
|
||||
func (c *Controller) onAddArangoCluster(obj interface{}) {
|
||||
c.syncArangoCluster(obj.(*api.ArangoCluster))
|
||||
}
|
||||
|
||||
// onUpdateArangoCluster cluster update callback
|
||||
func (c *Controller) onUpdateArangoCluster(oldObj, newObj interface{}) {
|
||||
c.syncArangoCluster(newObj.(*api.ArangoCluster))
|
||||
}
|
||||
|
||||
// onDeleteArangoCluster cluster delete callback
|
||||
func (c *Controller) onDeleteArangoCluster(obj interface{}) {
|
||||
clus, ok := obj.(*api.ArangoCluster)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("unknown object from EtcdCluster delete event: %#v", obj))
|
||||
}
|
||||
clus, ok = tombstone.Obj.(*api.ArangoCluster)
|
||||
if !ok {
|
||||
panic(fmt.Sprintf("Tombstone contained object that is not an ArangoCluster: %#v", obj))
|
||||
}
|
||||
}
|
||||
ev := &Event{
|
||||
Type: kwatch.Deleted,
|
||||
Object: clus,
|
||||
}
|
||||
|
||||
// pt.start()
|
||||
err := c.handleClusterEvent(ev)
|
||||
if err != nil {
|
||||
c.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event")
|
||||
}
|
||||
//pt.stop()
|
||||
}
|
||||
|
||||
// syncArangoCluster synchronized the given cluster.
|
||||
func (c *Controller) syncArangoCluster(apiCluster *api.ArangoCluster) {
|
||||
ev := &Event{
|
||||
Type: kwatch.Added,
|
||||
Object: apiCluster,
|
||||
}
|
||||
// re-watch or restart could give ADD event.
|
||||
// If for an ADD event the cluster spec is invalid then it is not added to the local cache
|
||||
// so modifying that cluster will result in another ADD event
|
||||
if _, ok := c.clusters[apiCluster.Name]; ok {
|
||||
ev.Type = kwatch.Modified
|
||||
}
|
||||
|
||||
//pt.start()
|
||||
err := c.handleClusterEvent(ev)
|
||||
if err != nil {
|
||||
c.Dependencies.Log.Warn().Err(err).Msg("Failed to handle event")
|
||||
}
|
||||
//pt.stop()
|
||||
}
|
||||
|
||||
// handleClusterEvent processed the given event.
|
||||
|
@ -153,13 +251,3 @@ func (c *Controller) makeClusterConfigAndDeps() (cluster.Config, cluster.Depende
|
|||
}
|
||||
return cfg, deps
|
||||
}
|
||||
|
||||
func (c *Controller) initCRD() error {
|
||||
if err := k8sutil.CreateCRD(c.KubeExtCli, api.ArangoClusterCRDName, api.ArangoClusterResourceKind, api.ArangoClusterResourcePlural, "arangodb"); err != nil {
|
||||
return maskAny(errors.Wrapf(err, "failed to create CRD: %v", err))
|
||||
}
|
||||
if err := k8sutil.WaitCRDReady(c.KubeExtCli, api.ArangoClusterCRDName); err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
54
pkg/controller/crd.go
Normal file
54
pkg/controller/crd.go
Normal file
|
@ -0,0 +1,54 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
|
||||
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
// initResourceIfNeeded initializes the custom resource definition when
|
||||
// instructed to do so by the config.
|
||||
func (c *Controller) initResourceIfNeeded() error {
|
||||
if c.Config.CreateCRD {
|
||||
if err := c.initCRD(); err != nil {
|
||||
return maskAny(fmt.Errorf("Failed to initialize Custom Resource Definition: %v", err))
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// initCRD creates the CustomResourceDefinition and waits for it to be ready.
|
||||
func (c *Controller) initCRD() error {
|
||||
if err := k8sutil.CreateCRD(c.KubeExtCli, api.ArangoClusterCRDName, api.ArangoClusterResourceKind, api.ArangoClusterResourcePlural, "arangodb"); err != nil {
|
||||
return maskAny(errors.Wrapf(err, "failed to create CRD: %v", err))
|
||||
}
|
||||
if err := k8sutil.WaitCRDReady(c.KubeExtCli, api.ArangoClusterCRDName); err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -29,8 +29,8 @@ import (
|
|||
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
|
||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/client-go/tools/clientcmd/api"
|
||||
|
||||
api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
|
||||
"github.com/arangodb/k8s-operator/pkg/util/retry"
|
||||
)
|
||||
|
||||
|
|
Loading…
Reference in a new issue