diff --git a/CHANGELOG.md b/CHANGELOG.md index d67d382b1..66042d650 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ - (Bugfix) Fix V2Alpha1 Generator - (Feature) Create Internal Actions and move RebalancerGenerator - (Dependencies) Bump K8S Dependencies to 1.22.15 +- (Bugfix) Unlock broken inspectors ## [1.2.20](https://github.com/arangodb/kube-arangodb/tree/1.2.20) (2022-10-25) - (Feature) Add action progress diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index 03fe76c62..99ee60605 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -259,7 +259,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De localInventory.Add(d) - if !d.acs.CurrentClusterCache().Initialised() { + for !d.acs.CurrentClusterCache().Initialised() { d.log.Warn("ACS cache not yet initialised") err := d.acs.CurrentClusterCache().Refresh(context.Background()) if err != nil { @@ -275,8 +275,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De aInformer.Start(d.stopCh) kInformer.Start(d.stopCh) - kInformer.WaitForCacheSync(d.stopCh) - aInformer.WaitForCacheSync(d.stopCh) + k8sutil.WaitForInformers(d.stopCh, 5*time.Second, kInformer, aInformer) go d.run() if apiObject.GetAcceptedSpec().GetMode() == api.DeploymentModeCluster { diff --git a/pkg/deployment/resources/inspector/inspector.go b/pkg/deployment/resources/inspector/inspector.go index f2cb4d223..c3fa4b740 100644 --- a/pkg/deployment/resources/inspector/inspector.go +++ b/pkg/deployment/resources/inspector/inspector.go @@ -146,14 +146,30 @@ type inspectorState struct { } func (i *inspectorState) RegisterInformers(k8s informers.SharedInformerFactory, arango arangoInformer.SharedInformerFactory) { - k8s.Core().V1().Nodes().Informer().AddEventHandler(i.eventHandler(definitions.Node)) + // K8S k8s.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(i.eventHandler(definitions.PersistentVolumeClaim)) - k8s.Policy().V1().PodDisruptionBudgets().Informer().AddEventHandler(i.eventHandler(definitions.PodDisruptionBudget)) - k8s.Policy().V1beta1().PodDisruptionBudgets().Informer().AddEventHandler(i.eventHandler(definitions.PodDisruptionBudget)) + + if i.PodDisruptionBudget().Version().IsV1() { + k8s.Policy().V1().PodDisruptionBudgets().Informer().AddEventHandler(i.eventHandler(definitions.PodDisruptionBudget)) + } else { + k8s.Policy().V1beta1().PodDisruptionBudgets().Informer().AddEventHandler(i.eventHandler(definitions.PodDisruptionBudget)) + } + k8s.Core().V1().Secrets().Informer().AddEventHandler(i.eventHandler(definitions.Secret)) k8s.Core().V1().Services().Informer().AddEventHandler(i.eventHandler(definitions.Service)) k8s.Core().V1().ServiceAccounts().Informer().AddEventHandler(i.eventHandler(definitions.ServiceAccount)) k8s.Core().V1().Endpoints().Informer().AddEventHandler(i.eventHandler(definitions.Endpoints)) + + // Arango + arango.Database().V1().ArangoMembers().Informer().AddEventHandler(i.eventHandler(definitions.ArangoMember)) + + if _, err := i.ArangoTask().V1(); err != nil { + arango.Database().V1().ArangoTasks().Informer().AddEventHandler(i.eventHandler(definitions.ArangoTask)) + } + + if _, err := i.ArangoClusterSynchronization().V1(); err != nil { + arango.Database().V1().ArangoClusterSynchronizations().Informer().AddEventHandler(i.eventHandler(definitions.ArangoClusterSynchronization)) + } } func extractGVKFromOwnerReference(o meta.OwnerReference) schema.GroupVersionKind { diff --git a/pkg/util/k8sutil/informers.go b/pkg/util/k8sutil/informers.go new file mode 100644 index 000000000..be4ccf55a --- /dev/null +++ b/pkg/util/k8sutil/informers.go @@ -0,0 +1,67 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package k8sutil + +import ( + "reflect" + "sync" + "time" + + "github.com/arangodb/kube-arangodb/pkg/util/timer" +) + +type Informer interface { + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + +func WaitForInformers(stop <-chan struct{}, timeout time.Duration, informers ...Informer) { + done := make(chan struct{}) + + go func() { + defer close(done) + + started := make(chan struct{}) + + go func() { + defer close(started) + + var wg sync.WaitGroup + + for id := range informers { + wg.Add(1) + + go func(id int) { + defer wg.Done() + informers[id].WaitForCacheSync(stop) + }(id) + } + + wg.Wait() + }() + + select { + case <-started: + case <-timer.After(timeout): + } + }() + + <-done +} diff --git a/pkg/util/k8sutil/informers_test.go b/pkg/util/k8sutil/informers_test.go new file mode 100644 index 000000000..1864c26a0 --- /dev/null +++ b/pkg/util/k8sutil/informers_test.go @@ -0,0 +1,93 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package k8sutil + +import ( + "reflect" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/arangodb/kube-arangodb/pkg/util/tests" + "github.com/arangodb/kube-arangodb/pkg/util/timer" +) + +type mockInformer struct { + delay time.Duration + done bool +} + +func (m *mockInformer) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool { + defer func() { + m.done = true + }() + c := timer.After(m.delay) + + select { + case <-c: + case <-stopCh: + } + + return nil +} + +func MockInformer(delay time.Duration) *mockInformer { + return &mockInformer{delay: delay} +} + +func Test_WaitForInformers(t *testing.T) { + t.Run("Delayed Sync", func(t *testing.T) { + a := MockInformer(time.Second) + + stop := make(chan struct{}) + + n := tests.DurationBetween() + WaitForInformers(stop, 5*time.Second, a) + + n(t, time.Second, 0.1) + require.True(t, a.done) + }) + + t.Run("Instant Sync", func(t *testing.T) { + a := MockInformer(time.Millisecond) + + stop := make(chan struct{}) + + n := tests.DurationBetween() + WaitForInformers(stop, 5*time.Second, a) + + n(t, time.Millisecond, 5) + require.True(t, a.done) + }) + + t.Run("Timeout Sync", func(t *testing.T) { + a := MockInformer(10 * time.Second) + + stop := make(chan struct{}) + + n := tests.DurationBetween() + WaitForInformers(stop, 5*time.Second, a) + + n(t, 5*time.Second, 0.05) + require.False(t, a.done) + }) +} diff --git a/pkg/util/tests/time.go b/pkg/util/tests/time.go new file mode 100644 index 000000000..330d07463 --- /dev/null +++ b/pkg/util/tests/time.go @@ -0,0 +1,41 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package tests + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func DurationBetween() func(t *testing.T, expected time.Duration, skew float64) { + start := time.Now() + return func(t *testing.T, expected time.Duration, skew float64) { + current := time.Since(start) + min := time.Duration(float64(expected) * (1 - skew)) + max := time.Duration(float64(expected) * (1 + skew)) + + if current > max || current < min { + require.Failf(t, "Skew is too big", "Expected %d, got %d", expected, current) + } + } +}