1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-15 17:51:03 +00:00
kube-arangodb/pkg/operator/operator_deployment_relication.go
2021-01-08 15:35:38 +01:00

216 lines
8.1 KiB
Go

//
// DISCLAIMER
//
// Copyright 2020 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package operator
import (
replication2 "github.com/arangodb/kube-arangodb/pkg/apis/replication"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1"
"github.com/arangodb/kube-arangodb/pkg/metrics"
"github.com/arangodb/kube-arangodb/pkg/replication"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
var (
deploymentReplicationsCreated = metrics.MustRegisterCounter("controller", "deployment_replications_created", "Number of deployment replications that have been created")
deploymentReplicationsDeleted = metrics.MustRegisterCounter("controller", "deployment_replications_deleted", "Number of deployment replications that have been deleted")
deploymentReplicationsFailed = metrics.MustRegisterCounter("controller", "deployment_replications_failed", "Number of deployment replications that have failed")
deploymentReplicationsModified = metrics.MustRegisterCounter("controller", "deployment_replications_modified", "Number of deployment replication modifications")
deploymentReplicationsCurrent = metrics.MustRegisterGauge("controller", "deployment_replications", "Number of deployment replications currently being managed")
)
// run the deployment replications part of the operator.
// This registers a listener and waits until the process stops.
func (o *Operator) runDeploymentReplications(stop <-chan struct{}) {
rw := k8sutil.NewResourceWatcher(
o.log,
o.Dependencies.CRCli.ReplicationV1().RESTClient(),
replication2.ArangoDeploymentReplicationResourcePlural,
o.Config.Namespace,
&api.ArangoDeploymentReplication{},
cache.ResourceEventHandlerFuncs{
AddFunc: o.onAddArangoDeploymentReplication,
UpdateFunc: o.onUpdateArangoDeploymentReplication,
DeleteFunc: o.onDeleteArangoDeploymentReplication,
})
o.Dependencies.DeploymentReplicationProbe.SetReady()
rw.Run(stop)
}
// onAddArangoDeploymentReplication deployment replication addition callback
func (o *Operator) onAddArangoDeploymentReplication(obj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
apiObject := obj.(*api.ArangoDeploymentReplication)
o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoDeploymentReplication added")
o.syncArangoDeploymentReplication(apiObject)
}
// onUpdateArangoDeploymentReplication deployment replication update callback
func (o *Operator) onUpdateArangoDeploymentReplication(oldObj, newObj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
apiObject := newObj.(*api.ArangoDeploymentReplication)
o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoDeploymentReplication updated")
o.syncArangoDeploymentReplication(apiObject)
}
// onDeleteArangoDeploymentReplication deployment replication delete callback
func (o *Operator) onDeleteArangoDeploymentReplication(obj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
log := o.log
apiObject, ok := obj.(*api.ArangoDeploymentReplication)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
log.Error().Interface("event-object", obj).Msg("unknown object from ArangoDeploymentReplication delete event")
return
}
apiObject, ok = tombstone.Obj.(*api.ArangoDeploymentReplication)
if !ok {
log.Error().Interface("event-object", obj).Msg("Tombstone contained object that is not an ArangoDeploymentReplication")
return
}
}
log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()).
Msg("ArangoDeploymentReplication deleted")
ev := &Event{
Type: kwatch.Deleted,
DeploymentReplication: apiObject,
}
// pt.start()
err := o.handleDeploymentReplicationEvent(ev)
if err != nil {
log.Warn().Err(err).Msg("Failed to handle event")
}
//pt.stop()
}
// syncArangoDeploymentReplication synchronized the given deployment replication.
func (o *Operator) syncArangoDeploymentReplication(apiObject *api.ArangoDeploymentReplication) {
ev := &Event{
Type: kwatch.Added,
DeploymentReplication: apiObject,
}
// re-watch or restart could give ADD event.
// If for an ADD event the cluster spec is invalid then it is not added to the local cache
// so modifying that deployment will result in another ADD event
if _, ok := o.deploymentReplications[apiObject.Name]; ok {
ev.Type = kwatch.Modified
}
//pt.start()
err := o.handleDeploymentReplicationEvent(ev)
if err != nil {
o.log.Warn().Err(err).Msg("Failed to handle event")
}
//pt.stop()
}
// handleDeploymentReplicationEvent processed the given event.
func (o *Operator) handleDeploymentReplicationEvent(event *Event) error {
apiObject := event.DeploymentReplication
if apiObject.Status.Phase.IsFailed() {
deploymentReplicationsFailed.Inc()
if event.Type == kwatch.Deleted {
delete(o.deploymentReplications, apiObject.Name)
return nil
}
return errors.WithStack(errors.Newf("ignore failed deployment replication (%s). Please delete its CR", apiObject.Name))
}
switch event.Type {
case kwatch.Added:
if _, ok := o.deploymentReplications[apiObject.Name]; ok {
return errors.WithStack(errors.Newf("unsafe state. deployment replication (%s) was created before but we received event (%s)", apiObject.Name, event.Type))
}
// Fill in defaults
apiObject.Spec.SetDefaults()
// Validate deployment spec
if err := apiObject.Spec.Validate(); err != nil {
return errors.WithStack(errors.Wrapf(err, "invalid deployment replication spec. please fix the following problem with the deployment replication spec: %v", err))
}
cfg, deps := o.makeDeploymentReplicationConfigAndDeps(apiObject)
nc, err := replication.New(cfg, deps, apiObject)
if err != nil {
return errors.WithStack(errors.Newf("failed to create deployment: %s", err))
}
o.deploymentReplications[apiObject.Name] = nc
deploymentReplicationsCreated.Inc()
deploymentReplicationsCurrent.Set(float64(len(o.deploymentReplications)))
case kwatch.Modified:
repl, ok := o.deploymentReplications[apiObject.Name]
if !ok {
return errors.WithStack(errors.Newf("unsafe state. deployment replication (%s) was never created but we received event (%s)", apiObject.Name, event.Type))
}
repl.Update(apiObject)
deploymentReplicationsModified.Inc()
case kwatch.Deleted:
repl, ok := o.deploymentReplications[apiObject.Name]
if !ok {
return errors.WithStack(errors.Newf("unsafe state. deployment replication (%s) was never created but we received event (%s)", apiObject.Name, event.Type))
}
repl.Delete()
delete(o.deploymentReplications, apiObject.Name)
deploymentReplicationsDeleted.Inc()
deploymentReplicationsCurrent.Set(float64(len(o.deploymentReplications)))
}
return nil
}
// makeDeploymentReplicationConfigAndDeps creates a Config & Dependencies object for a new DeploymentReplication.
func (o *Operator) makeDeploymentReplicationConfigAndDeps(apiObject *api.ArangoDeploymentReplication) (replication.Config, replication.Dependencies) {
cfg := replication.Config{
Namespace: o.Config.Namespace,
}
deps := replication.Dependencies{
Log: o.Dependencies.LogService.MustGetLogger("deployment-replication").With().
Str("deployment-replication", apiObject.GetName()).
Logger(),
KubeCli: o.Dependencies.KubeCli,
CRCli: o.Dependencies.CRCli,
EventRecorder: o.Dependencies.EventRecorder,
}
return cfg, deps
}