1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Bugfix] Fix CLusterScaling Integration (#1091)

This commit is contained in:
Adam Janikowski 2022-08-29 21:51:23 +02:00 committed by GitHub
parent 62d4244155
commit de360d97fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 30 additions and 25 deletions

View file

@ -19,6 +19,7 @@
- (Bugfix) Always recreate DBServers if they have a leader on it. - (Bugfix) Always recreate DBServers if they have a leader on it.
- (Feature) Immutable spec - (Feature) Immutable spec
- (Bugfix) Proper agent cleanout - (Bugfix) Proper agent cleanout
- (Bugfix) Fix ClusterScaling integration
## [1.2.15](https://github.com/arangodb/kube-arangodb/tree/1.2.15) (2022-07-20) ## [1.2.15](https://github.com/arangodb/kube-arangodb/tree/1.2.15) (2022-07-20)
- (Bugfix) Ensure pod names not too long - (Bugfix) Ensure pod names not too long

View file

@ -79,7 +79,7 @@ func newClusterScalingIntegration(depl *Deployment) *clusterScalingIntegration {
func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) { func (ci *clusterScalingIntegration) SendUpdateToCluster(spec api.DeploymentSpec) {
ci.pendingUpdate.mutex.Lock() ci.pendingUpdate.mutex.Lock()
defer ci.pendingUpdate.mutex.Unlock() defer ci.pendingUpdate.mutex.Unlock()
ci.pendingUpdate.spec = &spec ci.pendingUpdate.spec = spec.DeepCopy()
} }
// checkScalingCluster checks if inspection // checkScalingCluster checks if inspection
@ -112,10 +112,6 @@ func (ci *clusterScalingIntegration) checkScalingCluster(ctx context.Context, ex
return false return false
} }
if !status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
return false
}
// Update cluster with our state // Update cluster with our state
safeToAskCluster, err := ci.updateClusterServerCount(ctx, expectSuccess) safeToAskCluster, err := ci.updateClusterServerCount(ctx, expectSuccess)
if err != nil { if err != nil {
@ -135,7 +131,7 @@ func (ci *clusterScalingIntegration) checkScalingCluster(ctx context.Context, ex
return false return false
} }
// listenForClusterEvents keep listening for changes entered in the UI of the cluster. // ListenForClusterEvents keep listening for changes entered in the UI of the cluster.
func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) { func (ci *clusterScalingIntegration) ListenForClusterEvents(stopCh <-chan struct{}) {
start := time.Now() start := time.Now()
goodInspections := 0 goodInspections := 0
@ -176,6 +172,7 @@ func (ci *clusterScalingIntegration) cleanClusterServers(ctx context.Context) er
} }
if req.Coordinators != nil || req.DBServers != nil { if req.Coordinators != nil || req.DBServers != nil {
log.Debug("Clean number of servers")
if err := arangod.CleanNumberOfServers(ctx, c.Connection()); err != nil { if err := arangod.CleanNumberOfServers(ctx, c.Connection()); err != nil {
log.Err(err).Debug("Failed to clean number of servers") log.Err(err).Debug("Failed to clean number of servers")
return errors.WithStack(err) return errors.WithStack(err)
@ -261,6 +258,7 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
ci.pendingUpdate.mutex.Lock() ci.pendingUpdate.mutex.Lock()
spec := ci.pendingUpdate.spec spec := ci.pendingUpdate.spec
ci.pendingUpdate.mutex.Unlock() ci.pendingUpdate.mutex.Unlock()
if spec == nil { if spec == nil {
// Nothing pending // Nothing pending
return true, nil return true, nil
@ -288,8 +286,8 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
// This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop) // This is to prevent unneseccary updates that may override some values written by the WebUI (in the case of a update loop)
if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() { if coordinatorCount != lastNumberOfServers.GetCoordinators() || dbserverCount != lastNumberOfServers.GetDBServers() {
if err := ci.depl.SetNumberOfServers(ctx, coordinatorCountPtr, dbserverCountPtr); err != nil {
log.Debug("Setting number of servers %d/%d", coordinatorCount, dbserverCount) log.Debug("Setting number of servers %d/%d", coordinatorCount, dbserverCount)
if err := ci.depl.SetNumberOfServers(ctx, coordinatorCountPtr, dbserverCountPtr); err != nil {
if expectSuccess { if expectSuccess {
log.Err(err).Debug("Failed to set number of servers") log.Err(err).Debug("Failed to set number of servers")
} }

View file

@ -354,14 +354,7 @@ func (d *Deployment) run() {
inspectionInterval := d.inspectDeployment(minInspectionInterval) inspectionInterval := d.inspectDeployment(minInspectionInterval)
log.Str("interval", inspectionInterval.String()).Debug("...deployment inspect started") log.Str("interval", inspectionInterval.String()).Debug("...deployment inspect started")
if ci := d.clusterScalingIntegration; ci != nil { d.sendCIUpdate()
if c := d.currentObjectStatus; c != nil {
if a := c.AcceptedSpec; a != nil {
log.Debug("Send initial CI update")
ci.SendUpdateToCluster(*a)
}
}
}
for { for {
select { select {

View file

@ -106,16 +106,6 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval
} }
} else if changed { } else if changed {
d.log.Info("Accepted new spec") d.log.Info("Accepted new spec")
// Notify cluster of desired server count
if ci := d.clusterScalingIntegration; ci != nil {
if c := d.currentObjectStatus; c != nil {
if a := c.AcceptedSpec; a != nil {
if c.Conditions.IsTrue(api.ConditionTypeUpToDate) {
ci.SendUpdateToCluster(*a)
}
}
}
}
return minInspectionInterval // Retry ASAP return minInspectionInterval // Retry ASAP
} else if !canProceed { } else if !canProceed {
d.log.Err(err).Error("Cannot proceed with reconciliation") d.log.Err(err).Error("Cannot proceed with reconciliation")
@ -375,6 +365,7 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
} }
if isUpToDate && !status.Conditions.IsTrue(api.ConditionTypeUpToDate) { if isUpToDate && !status.Conditions.IsTrue(api.ConditionTypeUpToDate) {
d.sendCIUpdate()
if err = d.updateConditionWithHash(ctx, api.ConditionTypeUpToDate, true, "Spec is Up To Date", "Spec is Up To Date", *v); err != nil { if err = d.updateConditionWithHash(ctx, api.ConditionTypeUpToDate, true, "Spec is Up To Date", "Spec is Up To Date", *v); err != nil {
return minInspectionInterval, errors.Wrapf(err, "Unable to update UpToDate condition") return minInspectionInterval, errors.Wrapf(err, "Unable to update UpToDate condition")
} }
@ -414,6 +405,16 @@ func (d *Deployment) inspectDeploymentWithError(ctx context.Context, lastInterva
return return
} }
func (d *Deployment) sendCIUpdate() {
if ci := d.clusterScalingIntegration; ci != nil {
if c := d.currentObjectStatus; c != nil {
if a := c.AcceptedSpec; a != nil {
ci.SendUpdateToCluster(*a)
}
}
}
}
func (d *Deployment) isUpToDateStatus(status api.DeploymentStatus) (upToDate bool, reason string) { func (d *Deployment) isUpToDateStatus(status api.DeploymentStatus) (upToDate bool, reason string) {
if !status.IsPlanEmpty() { if !status.IsPlanEmpty() {
return false, "Plan is not empty" return false, "Plan is not empty"

View file

@ -32,6 +32,18 @@ var ephemeralVolumes = &feature{
enabledByDefault: false, enabledByDefault: false,
} }
var sensitiveInformationProtection = &feature{
name: "sensitive-information-protection",
description: "Hide sensitive information from metrics and logs",
version: "3.7.0",
enterpriseRequired: false,
enabledByDefault: false,
}
func EphemeralVolumes() Feature { func EphemeralVolumes() Feature {
return ephemeralVolumes return ephemeralVolumes
} }
func SensitiveInformationProtection() Feature {
return sensitiveInformationProtection
}