1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-15 17:51:03 +00:00

Fixed behavior for scaling UI integration wrt startup of the cluster

This commit is contained in:
Ewout Prangsma 2018-03-23 10:52:41 +01:00
parent da9e477a47
commit 1e51a90452
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
4 changed files with 197 additions and 170 deletions

View file

@ -1,104 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
func (d *Deployment) listenForClusterEvents(stopCh <-chan struct{}) {
for {
delay := time.Second * 2
// Inspect once
ctx := context.Background()
if err := d.inspectCluster(ctx); err != nil {
d.deps.Log.Debug().Err(err).Msg("Cluster inspection failed")
}
select {
case <-time.After(delay):
// Continue
case <-stopCh:
// We're done
return
}
}
}
// Perform a single inspection of the cluster
func (d *Deployment) inspectCluster(ctx context.Context) error {
log := d.deps.Log
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
}
req, err := arangod.GetNumberOfServers(ctx, c.Connection())
if err != nil {
log.Debug().Err(err).Msg("Failed to get number of servers")
return maskAny(err)
}
if req.Coordinators == nil && req.DBServers == nil {
// Nothing to check
return nil
}
coordinatorsChanged := false
dbserversChanged := false
d.lastNumberOfServers.mutex.Lock()
defer d.lastNumberOfServers.mutex.Unlock()
desired := d.lastNumberOfServers.NumberOfServers
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
// #Coordinator has changed
coordinatorsChanged = true
}
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
// #DBServers has changed
dbserversChanged = true
}
if !coordinatorsChanged && !dbserversChanged {
// Nothing has changed
return nil
}
// Let's update the spec
current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.Namespace).Get(d.apiObject.Name, metav1.GetOptions{})
if err != nil {
log.Debug().Err(err).Msg("Failed to get current deployment")
return maskAny(err)
}
if coordinatorsChanged {
current.Spec.Coordinators.Count = req.GetCoordinators()
}
if dbserversChanged {
current.Spec.DBServers.Count = req.GetDBServers()
}
if err := d.updateCRSpec(current.Spec); err != nil {
log.Warn().Err(err).Msg("Failed to update current deployment")
return maskAny(err)
}
return nil
}

View file

@ -0,0 +1,188 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"sync"
"time"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/rs/zerolog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// clusterScalingIntegration is a helper to communicate with the clusters
// scaling UI.
type clusterScalingIntegration struct {
log zerolog.Logger
depl *Deployment
pendingUpdate struct {
mutex sync.Mutex
spec *api.DeploymentSpec
}
lastNumberOfServers struct {
arangod.NumberOfServers
mutex sync.Mutex
}
}
// newClusterScalingIntegration creates a new clusterScalingIntegration.
func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration {
return &clusterScalingIntegration{
log: depl.deps.Log,
depl: depl,
}
}
// SendUpdateToCluster records the given spec to be sended to the cluster.
func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) {
ci.pendingUpdate.mutex.Lock()
defer ci.pendingUpdate.mutex.Unlock()
ci.pendingUpdate.spec = &spec
}
// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) {
for {
delay := time.Second * 2
// Is deployment in running state
if ci.depl.status.State == api.DeploymentStateRunning {
// Update cluster with our state
ctx := context.Background()
safeToAskCluster, err := ci.updateClusterServerCount(ctx)
if err != nil {
ci.log.Debug().Err(err).Msg("Cluster update failed")
} else if safeToAskCluster {
// Inspect once
if err := ci.inspectCluster(ctx); err != nil {
ci.log.Debug().Err(err).Msg("Cluster inspection failed")
}
}
}
select {
case <-time.After(delay):
// Continue
case <-stopCh:
// We're done
return
}
}
}
// Perform a single inspection of the cluster
func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context) error {
log := ci.log
c, err := ci.depl.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
}
req, err := arangod.GetNumberOfServers(ctx, c.Connection())
if err != nil {
log.Debug().Err(err).Msg("Failed to get number of servers")
return maskAny(err)
}
if req.Coordinators == nil && req.DBServers == nil {
// Nothing to check
return nil
}
coordinatorsChanged := false
dbserversChanged := false
ci.lastNumberOfServers.mutex.Lock()
defer ci.lastNumberOfServers.mutex.Unlock()
desired := ci.lastNumberOfServers.NumberOfServers
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
// #Coordinator has changed
coordinatorsChanged = true
}
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
// #DBServers has changed
dbserversChanged = true
}
if !coordinatorsChanged && !dbserversChanged {
// Nothing has changed
return nil
}
// Let's update the spec
apiObject := ci.depl.apiObject
current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(apiObject.Namespace).Get(apiObject.Name, metav1.GetOptions{})
if err != nil {
log.Debug().Err(err).Msg("Failed to get current deployment")
return maskAny(err)
}
if coordinatorsChanged {
current.Spec.Coordinators.Count = req.GetCoordinators()
}
if dbserversChanged {
current.Spec.DBServers.Count = req.GetDBServers()
}
if err := ci.depl.updateCRSpec(current.Spec); err != nil {
log.Warn().Err(err).Msg("Failed to update current deployment")
return maskAny(err)
}
return nil
}
// updateClusterServerCount updates the intended number of servers of the cluster.
// Returns true when it is safe to ask the cluster for updates.
func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Context) (bool, error) {
// Any update needed?
ci.pendingUpdate.mutex.Lock()
spec := ci.pendingUpdate.spec
ci.pendingUpdate.mutex.Unlock()
if spec == nil {
// Nothing pending
return true, nil
}
log := ci.log
c, err := ci.depl.clientCache.GetDatabase(ctx)
if err != nil {
return false, maskAny(err)
}
coordinatorCount := spec.Coordinators.Count
dbserverCount := spec.DBServers.Count
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
log.Debug().Err(err).Msg("Failed to set number of servers")
return false, maskAny(err)
}
// Success, now update internal state
safeToAskCluster := false
ci.pendingUpdate.mutex.Lock()
if spec == ci.pendingUpdate.spec {
ci.pendingUpdate.spec = nil
safeToAskCluster = true
}
ci.pendingUpdate.mutex.Unlock()
ci.lastNumberOfServers.mutex.Lock()
defer ci.lastNumberOfServers.mutex.Unlock()
ci.lastNumberOfServers.Coordinators = &coordinatorCount
ci.lastNumberOfServers.DBServers = &dbserverCount
return safeToAskCluster, nil
}

View file

@ -1,50 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package deployment
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)
// updateClusterServerCount updates the intended number of servers of the cluster.
func (d *Deployment) updateClusterServerCount(ctx context.Context) error {
log := d.deps.Log
c, err := d.clientCache.GetDatabase(ctx)
if err != nil {
return maskAny(err)
}
spec := d.apiObject.Spec
coordinatorCount := spec.Coordinators.Count
dbserverCount := spec.DBServers.Count
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
log.Debug().Err(err).Msg("Failed to set number of servers")
return maskAny(err)
}
d.lastNumberOfServers.mutex.Lock()
defer d.lastNumberOfServers.mutex.Unlock()
d.lastNumberOfServers.Coordinators = &coordinatorCount
d.lastNumberOfServers.DBServers = &dbserverCount
return nil
}

View file

@ -23,10 +23,8 @@
package deployment
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/rs/zerolog"
@ -38,7 +36,6 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
@ -91,13 +88,10 @@ type Deployment struct {
eventsCli corev1.EventInterface
inspectTrigger trigger.Trigger
clientCache *clientCache
recentInspectionErrors int
lastNumberOfServers struct {
arangod.NumberOfServers
mutex sync.Mutex
}
inspectTrigger trigger.Trigger
clientCache *clientCache
recentInspectionErrors int
clusterScalingIntegration *clusterScalingIntegration
}
// New creates a new Deployment from the given API object.
@ -119,7 +113,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
go d.run()
go d.listenForPodEvents()
if apiObject.Spec.Mode == api.DeploymentModeCluster {
go d.listenForClusterEvents(d.stopCh)
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
go ci.ListenForClusterEvents(d.stopCh)
}
return d, nil
@ -280,11 +276,8 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
}
// Notify cluster of desired server count
if d.apiObject.Spec.Mode == api.DeploymentModeCluster {
ctx := context.Background()
if err := d.updateClusterServerCount(ctx); err != nil {
log.Error().Err(err).Msg("Failed to update desired server count in cluster")
}
if ci := d.clusterScalingIntegration; ci != nil {
ci.SendUpdateToCluster(d.apiObject.Spec)
}
// Trigger inspect