1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Improvement] Refactor Resource Informer (#1155)

This commit is contained in:
Adam Janikowski 2022-10-25 16:22:56 -04:00 committed by GitHub
parent ba061315ad
commit 2da7108d33
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 139 additions and 266 deletions

View file

@ -30,6 +30,7 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/record"
"github.com/arangodb/arangosync-client/client"
@ -49,6 +50,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/deployment/resilience"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
arangoInformer "github.com/arangodb/kube-arangodb/pkg/generated/informers/externalversions"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
"github.com/arangodb/kube-arangodb/pkg/util"
@ -118,7 +120,6 @@ type Deployment struct {
stopped int32
inspectTrigger trigger.Trigger
inspectCRDTrigger trigger.Trigger
updateDeploymentTrigger trigger.Trigger
clientCache deploymentClient.Cache
agencyCache agency.Cache
@ -265,12 +266,18 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
}
}
aInformer := arangoInformer.NewSharedInformerFactoryWithOptions(deps.Client.Arango(), 0, arangoInformer.WithNamespace(apiObject.GetNamespace()))
kInformer := informers.NewSharedInformerFactoryWithOptions(deps.Client.Kubernetes(), 0, informers.WithNamespace(apiObject.GetNamespace()))
i.RegisterInformers(kInformer, aInformer)
aInformer.Start(d.stopCh)
kInformer.Start(d.stopCh)
kInformer.WaitForCacheSync(d.stopCh)
aInformer.WaitForCacheSync(d.stopCh)
go d.run()
go d.listenForPodEvents(d.stopCh)
go d.listenForPVCEvents(d.stopCh)
go d.listenForSecretEvents(d.stopCh)
go d.listenForServiceEvents(d.stopCh)
go d.listenForCRDEvents(d.stopCh)
if apiObject.GetAcceptedSpec().GetMode() == api.DeploymentModeCluster {
ci := newClusterScalingIntegration(d)
d.clusterScalingIntegration = ci
@ -378,8 +385,6 @@ func (d *Deployment) run() {
inspectionInterval = d.inspectDeployment(inspectionInterval)
log.Str("interval", inspectionInterval.String()).Trace("...inspected deployment")
case <-d.inspectCRDTrigger.Done():
d.lookForServiceMonitorCRD()
case <-d.updateDeploymentTrigger.Done():
inspectionInterval = minInspectionInterval
d.handleArangoDeploymentUpdatedEvent()
@ -515,9 +520,12 @@ func (d *Deployment) isOwnerOf(obj meta.Object) bool {
// lookForServiceMonitorCRD checks if there is a CRD for the ServiceMonitor
// CR and sets the flag haveServiceMonitorCRD accordingly. This is called
// once at creation time of the deployment and then always if the CRD
// informer is triggered.
// once at creation time of the deployment.
func (d *Deployment) lookForServiceMonitorCRD() {
if d.haveServiceMonitorCRD {
return
}
var err error
if d.GetScope().IsNamespaced() {
_, err = d.acs.CurrentClusterCache().ServiceMonitor().V1()

View file

@ -555,11 +555,6 @@ func (d *Deployment) triggerInspection() {
d.inspectTrigger.Trigger()
}
// triggerCRDInspection ensures that an inspection is run soon.
func (d *Deployment) triggerCRDInspection() {
d.inspectCRDTrigger.Trigger()
}
func (d *Deployment) updateConditionWithHash(ctx context.Context, conditionType api.ConditionType, status bool, reason, message, hash string) error {
d.log.Str("condition", string(conditionType)).Bool("status", status).Str("reason", reason).Str("message", message).Str("hash", hash).Info("Updated condition")
if err := d.WithStatusUpdate(ctx, func(s *api.DeploymentStatus) bool {

View file

@ -1,225 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package deployment
import (
core "k8s.io/api/core/v1"
crdv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/client-go/tools/cache"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// listenForPodEvents keep listening for changes in pod until the given channel is closed.
func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) {
getPod := func(obj interface{}) (*core.Pod, bool) {
pod, ok := obj.(*core.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pod, ok = tombstone.Obj.(*core.Pod)
return pod, ok
}
return pod, true
}
rw := k8sutil.NewResourceWatcher(
d.deps.Client.Kubernetes().CoreV1().RESTClient(),
"pods",
d.currentObject.GetNamespace(),
&core.Pod{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
if p, ok := getPod(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Pod().Invalidate()
if p, ok := getPod(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
})
rw.Run(stopCh)
}
// listenForPVCEvents keep listening for changes in PVC's until the given channel is closed.
func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) {
getPVC := func(obj interface{}) (*core.PersistentVolumeClaim, bool) {
pvc, ok := obj.(*core.PersistentVolumeClaim)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pvc, ok = tombstone.Obj.(*core.PersistentVolumeClaim)
return pvc, ok
}
return pvc, true
}
rw := k8sutil.NewResourceWatcher(
d.deps.Client.Kubernetes().CoreV1().RESTClient(),
"persistentvolumeclaims",
d.currentObject.GetNamespace(),
&core.PersistentVolumeClaim{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
if p, ok := getPVC(newObj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().PersistentVolumeClaim().Invalidate()
if p, ok := getPVC(obj); ok && d.isOwnerOf(p) {
d.triggerInspection()
}
},
})
rw.Run(stopCh)
}
// listenForSecretEvents keep listening for changes in Secrets's until the given channel is closed.
func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) {
getSecret := func(obj interface{}) bool {
_, ok := obj.(*core.Secret)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return false
}
_, ok = tombstone.Obj.(*core.Secret)
return ok
}
return true
}
rw := k8sutil.NewResourceWatcher(
d.deps.Client.Kubernetes().CoreV1().RESTClient(),
"secrets",
d.currentObject.GetNamespace(),
&core.Secret{},
cache.ResourceEventHandlerFuncs{
// Note: For secrets we look at all of them because they do not have to be owned by this deployment.
AddFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
if getSecret(obj) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
if getSecret(newObj) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Secret().Invalidate()
if getSecret(obj) {
d.triggerInspection()
}
},
})
rw.Run(stopCh)
}
// listenForServiceEvents keep listening for changes in Service's until the given channel is closed.
func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) {
getService := func(obj interface{}) (*core.Service, bool) {
service, ok := obj.(*core.Service)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
service, ok = tombstone.Obj.(*core.Service)
return service, ok
}
return service, true
}
rw := k8sutil.NewResourceWatcher(
d.deps.Client.Kubernetes().CoreV1().RESTClient(),
"services",
d.currentObject.GetNamespace(),
&core.Service{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
if s, ok := getService(newObj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
DeleteFunc: func(obj interface{}) {
d.acs.CurrentClusterCache().GetThrottles().Service().Invalidate()
if s, ok := getService(obj); ok && d.isOwnerOf(s) {
d.triggerInspection()
}
},
})
rw.Run(stopCh)
}
// listenForCRDEvents keep listening for changes in CRDs until the given channel is closed.
func (d *Deployment) listenForCRDEvents(stopCh <-chan struct{}) {
rw := k8sutil.NewResourceWatcher(
d.deps.Client.KubernetesExtensions().ApiextensionsV1().RESTClient(),
"customresourcedefinitions",
"",
&crdv1.CustomResourceDefinition{},
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d.triggerCRDInspection()
},
DeleteFunc: func(obj interface{}) {
d.triggerCRDInspection()
},
})
rw.Run(stopCh)
}

View file

@ -29,10 +29,12 @@ import (
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
"github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
arangoInformer "github.com/arangodb/kube-arangodb/pkg/generated/informers/externalversions"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
@ -63,35 +65,9 @@ func init() {
}
var (
inspectorLoadersList inspectorLoaders
inspectorLoadersLock sync.Mutex
logger = logging.Global().Get("inspector")
)
func requireRegisterInspectorLoader(i inspectorLoader) {
if !registerInspectorLoader(i) {
panic("Unable to register inspector loader")
}
}
func registerInspectorLoader(i inspectorLoader) bool {
inspectorLoadersLock.Lock()
defer inspectorLoadersLock.Unlock()
n := i.Name()
if inspectorLoadersList.Get(n) != -1 {
return false
}
inspectorLoadersList = append(inspectorLoadersList, i)
return true
}
type inspectorLoaders []inspectorLoader
func (i inspectorLoaders) Get(name string) int {
for id, k := range i {
if k.Name() == name {
@ -168,6 +144,17 @@ type inspectorState struct {
initialised bool
}
func (i *inspectorState) RegisterInformers(k8s informers.SharedInformerFactory, arango arangoInformer.SharedInformerFactory) {
k8s.Core().V1().Nodes().Informer().AddEventHandler(i.eventHandler(throttle.Node))
k8s.Core().V1().PersistentVolumeClaims().Informer().AddEventHandler(i.eventHandler(throttle.PersistentVolumeClaim))
k8s.Policy().V1().PodDisruptionBudgets().Informer().AddEventHandler(i.eventHandler(throttle.PodDisruptionBudget))
k8s.Policy().V1beta1().PodDisruptionBudgets().Informer().AddEventHandler(i.eventHandler(throttle.PodDisruptionBudget))
k8s.Core().V1().Secrets().Informer().AddEventHandler(i.eventHandler(throttle.Secret))
k8s.Core().V1().Services().Informer().AddEventHandler(i.eventHandler(throttle.Service))
k8s.Core().V1().ServiceAccounts().Informer().AddEventHandler(i.eventHandler(throttle.ServiceAccount))
k8s.Core().V1().Endpoints().Informer().AddEventHandler(i.eventHandler(throttle.Endpoints))
}
func extractGVKFromOwnerReference(o meta.OwnerReference) schema.GroupVersionKind {
z := strings.SplitN(o.APIVersion, "/", 2)

View file

@ -0,0 +1,51 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package inspector
import (
"k8s.io/client-go/tools/cache"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle"
)
func (i *inspectorState) eventHandler(component throttle.Component) cache.ResourceEventHandler {
return eventHandler{
i: i,
component: component,
}
}
type eventHandler struct {
i *inspectorState
component throttle.Component
}
func (e eventHandler) OnAdd(obj interface{}) {
e.i.throttles.Invalidate(e.component)
}
func (e eventHandler) OnUpdate(oldObj, newObj interface{}) {
e.i.throttles.Invalidate(e.component)
}
func (e eventHandler) OnDelete(obj interface{}) {
e.i.throttles.Invalidate(e.component)
}

View file

@ -0,0 +1,53 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package inspector
import (
"sync"
)
var (
inspectorLoadersList inspectorLoaders
inspectorLoadersLock sync.Mutex
)
func requireRegisterInspectorLoader(i inspectorLoader) {
if !registerInspectorLoader(i) {
panic("Unable to register inspector loader")
}
}
func registerInspectorLoader(i inspectorLoader) bool {
inspectorLoadersLock.Lock()
defer inspectorLoadersLock.Unlock()
n := i.Name()
if inspectorLoadersList.Get(n) != -1 {
return false
}
inspectorLoadersList = append(inspectorLoadersList, i)
return true
}
type inspectorLoaders []inspectorLoader

View file

@ -25,7 +25,9 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/informers"
arangoInformer "github.com/arangodb/kube-arangodb/pkg/generated/informers/externalversions"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/anonymous"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangoclustersynchronization"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangodeployment"
@ -88,4 +90,6 @@ type Inspector interface {
arangotask.Inspector
mods.Mods
RegisterInformers(k8s informers.SharedInformerFactory, arango arangoInformer.SharedInformerFactory)
}