1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Endpoints inspector (#971)

This commit is contained in:
Adam Janikowski 2022-05-02 07:28:13 +02:00 committed by GitHub
parent a6986df87c
commit b7f720492a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 490 additions and 5 deletions

View file

@ -1,6 +1,7 @@
# Change Log
## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A)
- (Feature) Add CoreV1 Endpoints Inspector
## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30)
- (Bugfix) Orphan PVC are not removed

View file

@ -226,7 +226,8 @@ func newDeploymentThrottle() throttle.Components {
10*time.Second, // Secret
10*time.Second, // Service
30*time.Second, // SA
30*time.Second) // ServiceMonitor
30*time.Second, // ServiceMonitor
15*time.Second) // Endpoints
}
// New creates a new Deployment from the given API object.

View file

@ -0,0 +1,185 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package inspector
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func init() {
requireRegisterInspectorLoader(endpointsInspectorLoaderObj)
}
var endpointsInspectorLoaderObj = endpointsInspectorLoader{}
type endpointsInspectorLoader struct {
}
func (p endpointsInspectorLoader) Component() throttle.Component {
return throttle.Endpoints
}
func (p endpointsInspectorLoader) Load(ctx context.Context, i *inspectorState) {
var q endpointsInspector
p.loadV1(ctx, i, &q)
i.endpoints = &q
q.state = i
q.last = time.Now()
}
func (p endpointsInspectorLoader) loadV1(ctx context.Context, i *inspectorState, q *endpointsInspector) {
var z endpointsInspectorV1
z.endpointsInspector = q
z.endpoints, z.err = p.getV1Endpoints(ctx, i)
q.v1 = &z
}
func (p endpointsInspectorLoader) getV1Endpoints(ctx context.Context, i *inspectorState) (map[string]*core.Endpoints, error) {
objs, err := p.getV1EndpointsList(ctx, i)
if err != nil {
return nil, err
}
r := make(map[string]*core.Endpoints, len(objs))
for id := range objs {
r[objs[id].GetName()] = objs[id]
}
return r, nil
}
func (p endpointsInspectorLoader) getV1EndpointsList(ctx context.Context, i *inspectorState) ([]*core.Endpoints, error) {
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
obj, err := i.client.Kubernetes().CoreV1().Endpoints(i.namespace).List(ctxChild, meta.ListOptions{
Limit: globals.GetGlobals().Kubernetes().RequestBatchSize().Get(),
})
if err != nil {
return nil, err
}
items := obj.Items
cont := obj.Continue
var s = int64(len(items))
if z := obj.RemainingItemCount; z != nil {
s += *z
}
ptrs := make([]*core.Endpoints, 0, s)
for {
for id := range items {
ptrs = append(ptrs, &items[id])
}
if cont == "" {
break
}
items, cont, err = p.getV1EndpointsListRequest(ctx, i, cont)
if err != nil {
return nil, err
}
}
return ptrs, nil
}
func (p endpointsInspectorLoader) getV1EndpointsListRequest(ctx context.Context, i *inspectorState, cont string) ([]core.Endpoints, string, error) {
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
obj, err := i.client.Kubernetes().CoreV1().Endpoints(i.namespace).List(ctxChild, meta.ListOptions{
Limit: globals.GetGlobals().Kubernetes().RequestBatchSize().Get(),
Continue: cont,
})
if err != nil {
return nil, "", err
}
return obj.Items, obj.Continue, err
}
func (p endpointsInspectorLoader) Verify(i *inspectorState) error {
return nil
}
func (p endpointsInspectorLoader) Copy(from, to *inspectorState, override bool) {
if to.endpoints != nil {
if !override {
return
}
}
to.endpoints = from.endpoints
to.endpoints.state = to
}
func (p endpointsInspectorLoader) Name() string {
return "endpoints"
}
type endpointsInspector struct {
state *inspectorState
last time.Time
v1 *endpointsInspectorV1
}
func (p *endpointsInspector) LastRefresh() time.Time {
return p.last
}
func (p *endpointsInspector) Refresh(ctx context.Context) error {
p.Throttle(p.state.throttles).Invalidate()
return p.state.refresh(ctx, endpointsInspectorLoaderObj)
}
func (p endpointsInspector) Throttle(c throttle.Components) throttle.Throttle {
return c.Endpoints()
}
func (p *endpointsInspector) validate() error {
if p == nil {
return errors.Newf("EndpointsInspector is nil")
}
if p.state == nil {
return errors.Newf("Parent is nil")
}
return p.v1.validate()
}

View file

@ -0,0 +1,140 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package inspector
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
ins "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints/v1"
core "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func (p *endpointsInspector) V1() (ins.Inspector, error) {
if p.v1.err != nil {
return nil, p.v1.err
}
return p.v1, nil
}
type endpointsInspectorV1 struct {
endpointsInspector *endpointsInspector
endpoints map[string]*core.Endpoints
err error
}
func (p *endpointsInspectorV1) Filter(filters ...ins.Filter) []*core.Endpoints {
z := p.ListSimple()
r := make([]*core.Endpoints, 0, len(z))
for _, o := range z {
if !ins.FilterObject(o, filters...) {
continue
}
r = append(r, o)
}
return r
}
func (p *endpointsInspectorV1) validate() error {
if p == nil {
return errors.Newf("EndpointsV1Inspector is nil")
}
if p.endpointsInspector == nil {
return errors.Newf("Parent is nil")
}
if p.endpoints == nil && p.err == nil {
return errors.Newf("Endpoints or err should be not nil")
}
if p.endpoints != nil && p.err != nil {
return errors.Newf("Endpoints or err cannot be not nil together")
}
return nil
}
func (p *endpointsInspectorV1) ListSimple() []*core.Endpoints {
var r []*core.Endpoints
for _, endpoints := range p.endpoints {
r = append(r, endpoints)
}
return r
}
func (p *endpointsInspectorV1) GetSimple(name string) (*core.Endpoints, bool) {
endpoints, ok := p.endpoints[name]
if !ok {
return nil, false
}
return endpoints, true
}
func (p *endpointsInspectorV1) Iterate(action ins.Action, filters ...ins.Filter) error {
for _, endpoints := range p.endpoints {
if err := p.iterateEndpoints(endpoints, action, filters...); err != nil {
return err
}
}
return nil
}
func (p *endpointsInspectorV1) iterateEndpoints(endpoints *core.Endpoints, action ins.Action, filters ...ins.Filter) error {
for _, f := range filters {
if f == nil {
continue
}
if !f(endpoints) {
return nil
}
}
return action(endpoints)
}
func (p *endpointsInspectorV1) Read() ins.ReadInterface {
return p
}
func (p *endpointsInspectorV1) Get(ctx context.Context, name string, opts metav1.GetOptions) (*core.Endpoints, error) {
if s, ok := p.GetSimple(name); !ok {
return nil, apiErrors.NewNotFound(schema.GroupResource{
Group: core.GroupName,
Resource: "endpoints",
}, name)
} else {
return s, nil
}
}

View file

@ -33,6 +33,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangoclustersynchronization"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangotask"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/node"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod"
@ -134,6 +135,7 @@ type inspectorState struct {
arangoMembers *arangoMembersInspector
arangoTasks *arangoTasksInspector
arangoClusterSynchronizations *arangoClusterSynchronizationsInspector
endpoints *endpointsInspector
throttles throttle.Components
@ -142,6 +144,10 @@ type inspectorState struct {
initialised bool
}
func (i *inspectorState) Endpoints() endpoints.Definition {
return i.endpoints
}
func (i *inspectorState) Initialised() bool {
if i == nil {
return false
@ -345,6 +351,10 @@ func (i *inspectorState) validate() error {
return err
}
if err := i.endpoints.validate(); err != nil {
return err
}
return nil
}
@ -365,6 +375,7 @@ func (i *inspectorState) copyCore() *inspectorState {
arangoClusterSynchronizations: i.arangoClusterSynchronizations,
throttles: i.throttles.Copy(),
versionInfo: i.versionInfo,
endpoints: i.endpoints,
logger: i.logger,
}
}

View file

@ -133,7 +133,7 @@ func getAllTypes() []string {
func Test_Inspector_RefreshMatrix(t *testing.T) {
c := kclient.NewFakeClient()
tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)
tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)
i := NewInspector(tc, c, "test")
@ -293,7 +293,7 @@ func Test_Inspector_Load(t *testing.T) {
func Test_Inspector_Invalidate(t *testing.T) {
c := kclient.NewFakeClient()
tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)
tc := throttle.NewThrottleComponents(time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour, time.Hour)
i := NewInspector(tc, c, "test")

View file

@ -0,0 +1,36 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package endpoints
import (
v1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints/v1"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/refresh"
)
type Inspector interface {
Endpoints() Definition
}
type Definition interface {
refresh.Inspector
V1() (v1.Inspector, error)
}

View file

@ -0,0 +1,48 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import core "k8s.io/api/core/v1"
type Inspector interface {
ListSimple() []*core.Endpoints
GetSimple(name string) (*core.Endpoints, bool)
Filter(filters ...Filter) []*core.Endpoints
Iterate(action Action, filters ...Filter) error
Read() ReadInterface
}
type Filter func(at *core.Endpoints) bool
type Action func(at *core.Endpoints) error
func FilterObject(at *core.Endpoints, filters ...Filter) bool {
for _, f := range filters {
if f == nil {
continue
}
if !f(at) {
return false
}
}
return true
}

View file

@ -0,0 +1,49 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"context"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
// ModInterface has methods to work with Endpoints resources only for creation
type ModInterface interface {
Create(ctx context.Context, endpoints *core.Endpoints, opts meta.CreateOptions) (*core.Endpoints, error)
Update(ctx context.Context, endpoints *core.Endpoints, opts meta.UpdateOptions) (*core.Endpoints, error)
UpdateStatus(ctx context.Context, endpoints *core.Endpoints, opts meta.UpdateOptions) (*core.Endpoints, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts meta.PatchOptions, subresources ...string) (result *core.Endpoints, err error)
Delete(ctx context.Context, name string, opts meta.DeleteOptions) error
}
// Interface has methods to work with Endpoints resources.
type Interface interface {
ModInterface
ReadInterface
}
// ReadInterface has methods to work with Endpoints resources with ReadOnly mode.
type ReadInterface interface {
Get(ctx context.Context, name string, opts meta.GetOptions) (*core.Endpoints, error)
}

View file

@ -28,6 +28,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangoclustersynchronization"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangotask"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/endpoints"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget"
@ -57,6 +58,7 @@ type Inspector interface {
serviceaccount.Inspector
arangomember.Inspector
server.Inspector
endpoints.Inspector
node.Inspector
arangoclustersynchronization.Inspector

View file

@ -30,10 +30,10 @@ type Inspector interface {
}
func NewAlwaysThrottleComponents() Components {
return NewThrottleComponents(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
return NewThrottleComponents(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
}
func NewThrottleComponents(acs, am, at, node, pvc, pod, pdb, secret, service, serviceAccount, sm time.Duration) Components {
func NewThrottleComponents(acs, am, at, node, pvc, pod, pdb, secret, service, serviceAccount, sm, endpoints time.Duration) Components {
return &throttleComponents{
arangoClusterSynchronization: NewThrottle(acs),
arangoMember: NewThrottle(am),
@ -46,6 +46,7 @@ func NewThrottleComponents(acs, am, at, node, pvc, pod, pdb, secret, service, se
service: NewThrottle(service),
serviceAccount: NewThrottle(serviceAccount),
serviceMonitor: NewThrottle(sm),
endpoints: NewThrottle(endpoints),
}
}
@ -65,6 +66,7 @@ const (
Service Component = "Service"
ServiceAccount Component = "ServiceAccount"
ServiceMonitor Component = "ServiceMonitor"
Endpoints Component = "Endpoints"
)
func AllComponents() []Component {
@ -80,6 +82,7 @@ func AllComponents() []Component {
Service,
ServiceAccount,
ServiceMonitor,
Endpoints,
}
}
@ -95,6 +98,7 @@ type Components interface {
Service() Throttle
ServiceAccount() Throttle
ServiceMonitor() Throttle
Endpoints() Throttle
Get(c Component) Throttle
Invalidate(components ...Component)
@ -115,6 +119,11 @@ type throttleComponents struct {
service Throttle
serviceAccount Throttle
serviceMonitor Throttle
endpoints Throttle
}
func (t *throttleComponents) Endpoints() Throttle {
return t.endpoints
}
func (t *throttleComponents) Counts() ComponentCount {
@ -160,6 +169,8 @@ func (t *throttleComponents) Get(c Component) Throttle {
return t.serviceAccount
case ServiceMonitor:
return t.serviceMonitor
case Endpoints:
return t.endpoints
default:
return NewAlwaysThrottle()
}
@ -178,6 +189,7 @@ func (t *throttleComponents) Copy() Components {
service: t.service.Copy(),
serviceAccount: t.serviceAccount.Copy(),
serviceMonitor: t.serviceMonitor.Copy(),
endpoints: t.endpoints.Copy(),
}
}