mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
[Bugfix] Add RF Toleration (#860)
This commit is contained in:
parent
1cef3cf511
commit
3934e6e2d0
7 changed files with 63 additions and 42 deletions
|
@ -9,6 +9,7 @@
|
|||
- Allow to disable ClusterScalingIntegration and add proper Scheduled label to pods
|
||||
- Add additional timeout parameters and kubernetes batch size
|
||||
- Limit parallel Backup uploads
|
||||
- Bugfix - Adjust Cluster Scaling Integration logic
|
||||
|
||||
## [1.2.5](https://github.com/arangodb/kube-arangodb/tree/1.2.5) (2021-10-25)
|
||||
- Split & Unify Lifecycle management functionality
|
||||
|
|
2
main.go
2
main.go
|
@ -168,7 +168,7 @@ func init() {
|
|||
f.DurationVar(&operatorTimeouts.k8s, "timeout.k8s", globals.DefaultKubernetesTimeout, "The request timeout to the kubernetes")
|
||||
f.DurationVar(&operatorTimeouts.arangoD, "timeout.arangod", globals.DefaultArangoDTimeout, "The request timeout to the ArangoDB")
|
||||
f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR")
|
||||
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
|
||||
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", true, "Enable Scaling Integration")
|
||||
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
|
||||
f.IntVar(&operatorBackup.concurrentUploads, "backup-concurrent-uploads", globals.DefaultBackupConcurrentUploads, "Number of concurrent uploads per deployment")
|
||||
features.Init(&cmdMain)
|
||||
|
|
|
@ -22,17 +22,8 @@ package v1
|
|||
|
||||
import "k8s.io/apimachinery/pkg/types"
|
||||
|
||||
type TopologyMemberStatusInitPhase string
|
||||
|
||||
const (
|
||||
TopologyMemberStatusInitPhaseNone TopologyMemberStatusInitPhase = ""
|
||||
TopologyMemberStatusInitPhasePending TopologyMemberStatusInitPhase = "pending"
|
||||
TopologyMemberStatusInitPhaseOK TopologyMemberStatusInitPhase = "ok"
|
||||
)
|
||||
|
||||
type TopologyMemberStatus struct {
|
||||
ID types.UID `json:"id"`
|
||||
Zone int `json:"rack"`
|
||||
Label string `json:"label,omitempty"`
|
||||
InitPhase TopologyMemberStatusInitPhase `json:"init_phase,omitempty"`
|
||||
}
|
||||
|
|
|
@ -22,17 +22,8 @@ package v2alpha1
|
|||
|
||||
import "k8s.io/apimachinery/pkg/types"
|
||||
|
||||
type TopologyMemberStatusInitPhase string
|
||||
|
||||
const (
|
||||
TopologyMemberStatusInitPhaseNone TopologyMemberStatusInitPhase = ""
|
||||
TopologyMemberStatusInitPhasePending TopologyMemberStatusInitPhase = "pending"
|
||||
TopologyMemberStatusInitPhaseOK TopologyMemberStatusInitPhase = "ok"
|
||||
)
|
||||
|
||||
type TopologyMemberStatus struct {
|
||||
ID types.UID `json:"id"`
|
||||
Zone int `json:"rack"`
|
||||
Label string `json:"label,omitempty"`
|
||||
InitPhase TopologyMemberStatusInitPhase `json:"init_phase,omitempty"`
|
||||
}
|
||||
|
|
|
@ -252,8 +252,7 @@ func (ci *clusterScalingIntegration) updateClusterServerCount(ctx context.Contex
|
|||
var coordinatorCountPtr *int
|
||||
var dbserverCountPtr *int
|
||||
|
||||
coordinatorCount := spec.Coordinators.GetCount()
|
||||
dbserverCount := spec.DBServers.GetCount()
|
||||
coordinatorCount, dbserverCount := ci.getNumbersOfServers()
|
||||
|
||||
if spec.Coordinators.GetMaxCount() == spec.Coordinators.GetMinCount() {
|
||||
coordinatorCountPtr = nil
|
||||
|
@ -335,8 +334,11 @@ func (ci *clusterScalingIntegration) EnableScalingCluster(ctx context.Context) e
|
|||
}
|
||||
|
||||
func (ci *clusterScalingIntegration) setNumberOfServers(ctx context.Context) error {
|
||||
spec := ci.depl.GetSpec()
|
||||
numOfCoordinators := spec.Coordinators.GetCount()
|
||||
numOfDBServers := spec.DBServers.GetCount()
|
||||
numOfCoordinators, numOfDBServers := ci.getNumbersOfServers()
|
||||
return ci.depl.SetNumberOfServers(ctx, &numOfCoordinators, &numOfDBServers)
|
||||
}
|
||||
|
||||
func (ci *clusterScalingIntegration) getNumbersOfServers() (int, int) {
|
||||
status, _ := ci.depl.getStatus()
|
||||
return len(status.Members.Coordinators), len(status.Members.DBServers)
|
||||
}
|
||||
|
|
|
@ -561,9 +561,6 @@ func (r *Resources) createPodForMember(ctx context.Context, cachedStatus inspect
|
|||
} else {
|
||||
m.Conditions.Update(api.ConditionTypeTopologyAware, false, "Topology Aware", "Topology invalid")
|
||||
}
|
||||
if m.Topology.InitPhase == api.TopologyMemberStatusInitPhaseNone {
|
||||
m.Topology.InitPhase = api.TopologyMemberStatusInitPhasePending
|
||||
}
|
||||
} else {
|
||||
m.Conditions.Update(api.ConditionTypeTopologyAware, false, "Topology spec missing", "Topology spec missing")
|
||||
}
|
||||
|
|
|
@ -189,17 +189,39 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
|
|||
if k8sutil.IsPodScheduled(pod) {
|
||||
if _, ok := pod.Labels[k8sutil.LabelKeyArangoScheduled]; !ok {
|
||||
// Adding scheduled label to the pod
|
||||
l := pod.Labels
|
||||
if l == nil {
|
||||
l = map[string]string{}
|
||||
}
|
||||
l[k8sutil.LabelKeyArangoScheduled] = "1"
|
||||
l := addLabel(pod.Labels, k8sutil.LabelKeyArangoScheduled, "1")
|
||||
|
||||
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), l)); err != nil {
|
||||
log.Error().Err(err).Msgf("Unable to update scheduled labels")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Topology labels
|
||||
tv, tok := pod.Labels[k8sutil.LabelKeyArangoTopology]
|
||||
zv, zok := pod.Labels[k8sutil.LabelKeyArangoZone]
|
||||
|
||||
if t, ts := status.Topology, memberStatus.Topology; t.Enabled() && t.IsTopologyOwned(ts) {
|
||||
if tid, tz := string(t.ID), fmt.Sprintf("%d", ts.Zone); !tok || !zok || tv != tid || zv != tz {
|
||||
l := addLabel(pod.Labels, k8sutil.LabelKeyArangoTopology, tid)
|
||||
l = addLabel(l, k8sutil.LabelKeyArangoZone, tz)
|
||||
|
||||
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), l)); err != nil {
|
||||
log.Error().Err(err).Msgf("Unable to update topology labels")
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if tok || zok {
|
||||
l := removeLabel(pod.Labels, k8sutil.LabelKeyArangoTopology)
|
||||
l = removeLabel(l, k8sutil.LabelKeyArangoZone)
|
||||
|
||||
if err := r.context.ApplyPatchOnPod(ctx, pod, patch.ItemReplace(patch.NewPath("metadata", "labels"), l)); err != nil {
|
||||
log.Error().Err(err).Msgf("Unable to remove topology labels")
|
||||
}
|
||||
}
|
||||
}
|
||||
// End of Topology labels
|
||||
|
||||
if k8sutil.IsContainerReady(pod, k8sutil.ServerContainerName) {
|
||||
// Pod is now ready
|
||||
if memberStatus.Conditions.Update(api.ConditionTypeReady, true, "Pod Ready", "") {
|
||||
|
@ -215,10 +237,6 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
|
|||
memberStatus.Topology.Label = label
|
||||
}
|
||||
}
|
||||
|
||||
if memberStatus.Topology.InitPhase == api.TopologyMemberStatusInitPhasePending {
|
||||
memberStatus.Topology.InitPhase = api.TopologyMemberStatusInitPhaseOK
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -366,3 +384,24 @@ func (r *Resources) InspectPods(ctx context.Context, cachedStatus inspectorInter
|
|||
}
|
||||
return nextInterval, nil
|
||||
}
|
||||
|
||||
func addLabel(labels map[string]string, key, value string) map[string]string {
|
||||
if labels != nil {
|
||||
labels[key] = value
|
||||
return labels
|
||||
}
|
||||
|
||||
return map[string]string{
|
||||
key: value,
|
||||
}
|
||||
}
|
||||
|
||||
func removeLabel(labels map[string]string, key string) map[string]string {
|
||||
if labels == nil {
|
||||
return map[string]string{}
|
||||
}
|
||||
|
||||
delete(labels, key)
|
||||
|
||||
return labels
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue