mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Integrate with scaling UI
This commit is contained in:
parent
667380b9e9
commit
85348317be
4 changed files with 189 additions and 2 deletions
104
pkg/deployment/cluster_informer.go
Normal file
104
pkg/deployment/cluster_informer.go
Normal file
|
@ -0,0 +1,104 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// listenForClusterEvents keep listening for changes entered in the UI of the cluster.
|
||||
func (d *Deployment) listenForClusterEvents(stopCh <-chan struct{}) {
|
||||
for {
|
||||
delay := time.Second * 2
|
||||
|
||||
// Inspect once
|
||||
ctx := context.Background()
|
||||
if err := d.inspectCluster(ctx); err != nil {
|
||||
d.deps.Log.Debug().Err(err).Msg("Cluster inspection failed")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
// Continue
|
||||
case <-stopCh:
|
||||
// We're done
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Perform a single inspection of the cluster
|
||||
func (d *Deployment) inspectCluster(ctx context.Context) error {
|
||||
log := d.deps.Log
|
||||
c, err := d.clientCache.GetDatabase(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
req, err := arangod.GetNumberOfServers(ctx, c.Connection())
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to get number of servers")
|
||||
return maskAny(err)
|
||||
}
|
||||
if req.Coordinators == nil && req.DBServers == nil {
|
||||
// Nothing to check
|
||||
return nil
|
||||
}
|
||||
coordinatorsChanged := false
|
||||
dbserversChanged := false
|
||||
d.lastNumberOfServers.mutex.Lock()
|
||||
defer d.lastNumberOfServers.mutex.Unlock()
|
||||
desired := d.lastNumberOfServers.NumberOfServers
|
||||
if req.Coordinators != nil && desired.Coordinators != nil && req.GetCoordinators() != desired.GetCoordinators() {
|
||||
// #Coordinator has changed
|
||||
coordinatorsChanged = true
|
||||
}
|
||||
if req.DBServers != nil && desired.DBServers != nil && req.GetDBServers() != desired.GetDBServers() {
|
||||
// #DBServers has changed
|
||||
dbserversChanged = true
|
||||
}
|
||||
if !coordinatorsChanged && !dbserversChanged {
|
||||
// Nothing has changed
|
||||
return nil
|
||||
}
|
||||
// Let's update the spec
|
||||
current, err := d.deps.DatabaseCRCli.DatabaseV1alpha().ArangoDeployments(d.apiObject.Namespace).Get(d.apiObject.Name, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to get current deployment")
|
||||
return maskAny(err)
|
||||
}
|
||||
if coordinatorsChanged {
|
||||
current.Spec.Coordinators.Count = req.GetCoordinators()
|
||||
}
|
||||
if dbserversChanged {
|
||||
current.Spec.DBServers.Count = req.GetDBServers()
|
||||
}
|
||||
if err := d.updateCRSpec(current.Spec); err != nil {
|
||||
log.Warn().Err(err).Msg("Failed to update current deployment")
|
||||
return maskAny(err)
|
||||
}
|
||||
return nil
|
||||
}
|
50
pkg/deployment/cluster_updater.go
Normal file
50
pkg/deployment/cluster_updater.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package deployment
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
|
||||
)
|
||||
|
||||
// updateClusterServerCount updates the intended number of servers of the cluster.
|
||||
func (d *Deployment) updateClusterServerCount(ctx context.Context) error {
|
||||
log := d.deps.Log
|
||||
c, err := d.clientCache.GetDatabase(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
spec := d.apiObject.Spec
|
||||
coordinatorCount := spec.Coordinators.Count
|
||||
dbserverCount := spec.DBServers.Count
|
||||
if err := arangod.SetNumberOfServers(ctx, c.Connection(), coordinatorCount, dbserverCount); err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to set number of servers")
|
||||
return maskAny(err)
|
||||
}
|
||||
d.lastNumberOfServers.mutex.Lock()
|
||||
defer d.lastNumberOfServers.mutex.Unlock()
|
||||
d.lastNumberOfServers.Coordinators = &coordinatorCount
|
||||
d.lastNumberOfServers.DBServers = &dbserverCount
|
||||
return nil
|
||||
}
|
|
@ -26,6 +26,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -37,6 +38,7 @@ import (
|
|||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/retry"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/trigger"
|
||||
|
@ -89,8 +91,12 @@ type Deployment struct {
|
|||
|
||||
eventsCli corev1.EventInterface
|
||||
|
||||
inspectTrigger trigger.Trigger
|
||||
clientCache *clientCache
|
||||
inspectTrigger trigger.Trigger
|
||||
clientCache *clientCache
|
||||
lastNumberOfServers struct {
|
||||
arangod.NumberOfServers
|
||||
mutex sync.Mutex
|
||||
}
|
||||
}
|
||||
|
||||
// New creates a new Deployment from the given API object.
|
||||
|
@ -111,6 +117,9 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
|
|||
|
||||
go d.run()
|
||||
go d.listenForPodEvents()
|
||||
if apiObject.Spec.Mode == api.DeploymentModeCluster {
|
||||
go d.listenForClusterEvents(d.stopCh)
|
||||
}
|
||||
|
||||
return d, nil
|
||||
}
|
||||
|
@ -312,6 +321,14 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
|
|||
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
|
||||
}
|
||||
|
||||
// Notify cluster of desired server count
|
||||
if d.apiObject.Spec.Mode == api.DeploymentModeCluster {
|
||||
ctx := context.Background()
|
||||
if err := d.updateClusterServerCount(ctx); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to update desired server count in cluster")
|
||||
}
|
||||
}
|
||||
|
||||
// Trigger inspect
|
||||
d.inspectTrigger.Trigger()
|
||||
|
||||
|
|
|
@ -34,6 +34,22 @@ type NumberOfServers struct {
|
|||
DBServers *int `json:"numberOfDBServers,omitempty"`
|
||||
}
|
||||
|
||||
// GetCoordinators returns Coordinators if not nil, otherwise 0.
|
||||
func (n NumberOfServers) GetCoordinators() int {
|
||||
if n.Coordinators != nil {
|
||||
return *n.Coordinators
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// GetDBServers returns DBServers if not nil, otherwise 0.
|
||||
func (n NumberOfServers) GetDBServers() int {
|
||||
if n.DBServers != nil {
|
||||
return *n.DBServers
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
// GetNumberOfServers fetches the number of servers the cluster wants to have.
|
||||
func GetNumberOfServers(ctx context.Context, conn driver.Connection) (NumberOfServers, error) {
|
||||
req, err := conn.NewRequest("GET", "_admin/cluster/numberOfServers")
|
||||
|
|
Loading…
Reference in a new issue