diff --git a/CHANGELOG.md b/CHANGELOG.md index 5abb19baf..3c8beced5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Change Log ## [master](https://github.com/arangodb/kube-arangodb/tree/master) (N/A) +- (Feature) Improve Kubernetes clientsets management ## [1.2.8](https://github.com/arangodb/kube-arangodb/tree/1.2.8) (2022-02-24) - Do not check License V2 on Community images diff --git a/cmd/admin.go b/cmd/admin.go index 074f8b4e0..add423f62 100644 --- a/cmd/admin.go +++ b/cmd/admin.go @@ -35,7 +35,6 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/globals" - "github.com/pkg/errors" "github.com/spf13/cobra" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -43,11 +42,12 @@ import ( "github.com/arangodb-helper/go-certificates" "github.com/arangodb/go-driver/jwt" "github.com/arangodb/go-driver/v2/connection" - v12 "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" - extclient "github.com/arangodb/kube-arangodb/pkg/client" + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/secret" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" ) const ArgDeploymentName = "deployment-name" @@ -109,12 +109,12 @@ func cmdGetAgencyState(cmd *cobra.Command, _ []string) { cliLog.Fatal().Err(err).Msg("failed to create basic data for the connection") } - if d.Spec.GetMode() != v12.DeploymentModeCluster { + if d.Spec.GetMode() != api.DeploymentModeCluster { cliLog.Fatal().Msgf("agency state does not work for the \"%s\" deployment \"%s\"", d.Spec.GetMode(), d.GetName()) } - dnsName := k8sutil.CreatePodDNSName(d.GetObjectMeta(), v12.ServerGroupAgents.AsRole(), d.Status.Members.Agents[0].ID) + dnsName := k8sutil.CreatePodDNSName(d.GetObjectMeta(), api.ServerGroupAgents.AsRole(), d.Status.Members.Agents[0].ID) endpoint := getArangoEndpoint(d.Spec.IsSecure(), dnsName) conn := createClient([]string{endpoint}, certCA, auth, connection.ApplicationJSON) leaderID, err := getAgencyLeader(ctx, conn) @@ -122,7 +122,7 @@ func cmdGetAgencyState(cmd *cobra.Command, _ []string) { cliLog.Fatal().Err(err).Msg("failed to get leader ID") } - dnsLeaderName := k8sutil.CreatePodDNSName(d.GetObjectMeta(), v12.ServerGroupAgents.AsRole(), leaderID) + dnsLeaderName := k8sutil.CreatePodDNSName(d.GetObjectMeta(), api.ServerGroupAgents.AsRole(), leaderID) leaderEndpoint := getArangoEndpoint(d.Spec.IsSecure(), dnsLeaderName) conn = createClient([]string{leaderEndpoint}, certCA, auth, connection.PlainText) body, err := getAgencyState(ctx, conn) @@ -145,7 +145,7 @@ func cmdGetAgencyDump(cmd *cobra.Command, _ []string) { cliLog.Fatal().Err(err).Msg("failed to create basic data for the connection") } - if d.Spec.GetMode() != v12.DeploymentModeCluster { + if d.Spec.GetMode() != api.DeploymentModeCluster { cliLog.Fatal().Msgf("agency dump does not work for the \"%s\" deployment \"%s\"", d.Spec.GetMode(), d.GetName()) } @@ -181,7 +181,7 @@ func getAgencyState(ctx context.Context, conn connection.Connection) (io.ReadClo // getDeploymentAndCredentials returns deployment and necessary credentials to communicate with ArangoDB pods. func getDeploymentAndCredentials(ctx context.Context, - deploymentName string) (d v12.ArangoDeployment, certCA *x509.CertPool, auth connection.Authentication, err error) { + deploymentName string) (d api.ArangoDeployment, certCA *x509.CertPool, auth connection.Authentication, err error) { namespace := os.Getenv(constants.EnvOperatorPodNamespace) if len(namespace) == 0 { @@ -189,12 +189,14 @@ func getDeploymentAndCredentials(ctx context.Context, return } - kubeCli, err := k8sutil.NewKubeClient() - if err != nil { - err = errors.WithMessage(err, "failed to create Kubernetes client") + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + err = errors.Newf("Client not initialised") return } + kubeCli := client.Kubernetes() + d, err = getDeployment(ctx, namespace, deploymentName) if err != nil { err = errors.WithMessage(err, "failed to get deployment") @@ -327,25 +329,27 @@ func getCACertificate(ctx context.Context, secrets secret.ReadInterface, name st // getDeployment returns ArangoDeployment within the provided namespace. // If there are more than two deployments within one namespace then // deployment name must be provided, otherwise error is returned. -func getDeployment(ctx context.Context, namespace, deplName string) (v12.ArangoDeployment, error) { - extCli, err := extclient.NewClient() - if err != nil { - return v12.ArangoDeployment{}, errors.WithMessage(err, "failed to create Arango extension client") +func getDeployment(ctx context.Context, namespace, deplName string) (api.ArangoDeployment, error) { + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + return api.ArangoDeployment{}, errors.Newf("Client not initialised") } + extCli := client.Arango() + ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() deployments, err := extCli.DatabaseV1().ArangoDeployments(namespace).List(ctxChild, metav1.ListOptions{}) if err != nil { - if v12.IsNotFound(err) { - return v12.ArangoDeployment{}, errors.WithMessage(err, "there are no deployments") + if api.IsNotFound(err) { + return api.ArangoDeployment{}, errors.WithMessage(err, "there are no deployments") } - return v12.ArangoDeployment{}, errors.WithMessage(err, "failed to get deployments") + return api.ArangoDeployment{}, errors.WithMessage(err, "failed to get deployments") } if len(deployments.Items) == 0 { - return v12.ArangoDeployment{}, errors.WithMessage(err, "there are no deployments") + return api.ArangoDeployment{}, errors.WithMessage(err, "there are no deployments") } if len(deplName) > 0 { @@ -356,7 +360,7 @@ func getDeployment(ctx context.Context, namespace, deplName string) (v12.ArangoD } } - return v12.ArangoDeployment{}, errors.New( + return api.ArangoDeployment{}, errors.New( fmt.Sprintf("the deployment \"%s\" does not exist in the namespace \"%s\"", deplName, namespace)) } @@ -370,7 +374,7 @@ func getDeployment(ctx context.Context, namespace, deplName string) (v12.ArangoD message += fmt.Sprintf(" %s", item.GetName()) } - return v12.ArangoDeployment{}, errors.New(message) + return api.ArangoDeployment{}, errors.New(message) } // getInterruptionContext returns context which will be cancelled when the process is interrupted. diff --git a/cmd/lifecycle.go b/cmd/lifecycle.go index e2604220e..5928435b5 100644 --- a/cmd/lifecycle.go +++ b/cmd/lifecycle.go @@ -40,6 +40,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/constants" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" ) var ( @@ -110,12 +111,12 @@ func cmdLifecyclePreStopRunFinalizer(cmd *cobra.Command, args []string) { } // Create kubernetes client - kubecli, err := k8sutil.NewKubeClient() - if err != nil { - cliLog.Fatal().Err(err).Msg("Failed to create Kubernetes client") + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + cliLog.Fatal().Msg("Client not initialised") } - pods := kubecli.CoreV1().Pods(namespace) + pods := client.Kubernetes().CoreV1().Pods(namespace) recentErrors := 0 for { p, err := pods.Get(context.Background(), name, metav1.GetOptions{}) @@ -199,12 +200,12 @@ func (c *cmdLifecyclePreStopRunPort) run(cmd *cobra.Command, args []string) erro } // Create kubernetes client - kubecli, err := k8sutil.NewKubeClient() - if err != nil { - cliLog.Fatal().Err(err).Msg("Failed to create Kubernetes client") + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + cliLog.Fatal().Msg("Client not initialised") } - pods := kubecli.CoreV1().Pods(namespace) + pods := client.Kubernetes().CoreV1().Pods(namespace) recentErrors := 0 diff --git a/cmd/main.go b/cmd/main.go index 3863f0d3e..876d075a5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -61,13 +61,13 @@ import ( v1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" - "github.com/arangodb/kube-arangodb/pkg/client" "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/scheme" "github.com/arangodb/kube-arangodb/pkg/logging" "github.com/arangodb/kube-arangodb/pkg/operator" "github.com/arangodb/kube-arangodb/pkg/server" "github.com/arangodb/kube-arangodb/pkg/util/constants" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/probe" "github.com/arangodb/kube-arangodb/pkg/util/retry" v1 "k8s.io/api/core/v1" @@ -89,8 +89,6 @@ const ( ) var ( - maskAny = errors.WithStack - cmdMain = cobra.Command{ Use: "arangodb_operator", Run: executeMain, @@ -124,6 +122,9 @@ var ( } operatorKubernetesOptions struct { maxBatchSize int64 + + qps float32 + burst int } operatorBackup struct { concurrentUploads int @@ -175,6 +176,8 @@ func init() { f.DurationVar(&operatorTimeouts.reconciliation, "timeout.reconciliation", globals.DefaultReconciliationTimeout, "The reconciliation timeout to the ArangoDB CR") f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", true, "Enable Scaling Integration") f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read") + f.Float32Var(&operatorKubernetesOptions.qps, "kubernetes.qps", kclient.DefaultQPS, "Number of queries per second for k8s API") + f.IntVar(&operatorKubernetesOptions.burst, "kubernetes.burst", kclient.DefaultBurst, "Burst for the k8s API") f.IntVar(&operatorBackup.concurrentUploads, "backup-concurrent-uploads", globals.DefaultBackupConcurrentUploads, "Number of concurrent uploads per deployment") features.Init(&cmdMain) } @@ -213,6 +216,9 @@ func executeMain(cmd *cobra.Command, args []string) { globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize) globals.GetGlobals().Backup().ConcurrentUploads().Set(operatorBackup.concurrentUploads) + kclient.SetDefaultQPS(operatorKubernetesOptions.qps) + kclient.SetDefaultBurst(operatorKubernetesOptions.burst) + // Prepare log service var err error logService, err = logging.NewService(defaultLogLevel, logLevels) @@ -264,12 +270,12 @@ func executeMain(cmd *cobra.Command, args []string) { cliLog.Fatal().Err(err).Msg("Failed to get hostname") } - // Create kubernetes client - kubecli, err := k8sutil.NewKubeClient() - if err != nil { - cliLog.Fatal().Err(err).Msg("Failed to create Kubernetes client") + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + cliLog.Fatal().Msg("Failed to get client") } - secrets := kubecli.CoreV1().Secrets(namespace) + + secrets := client.Kubernetes().CoreV1().Secrets(namespace) // Create operator cfg, deps, err := newOperatorConfigAndDeps(id+"-"+name, namespace, name) @@ -282,7 +288,7 @@ func executeMain(cmd *cobra.Command, args []string) { } listenAddr := net.JoinHostPort(serverOptions.host, strconv.Itoa(serverOptions.port)) - if svr, err := server.NewServer(kubecli.CoreV1(), server.Config{ + if svr, err := server.NewServer(client.Kubernetes().CoreV1(), server.Config{ Namespace: namespace, Address: listenAddr, TLSSecretName: serverOptions.tlsSecretName, @@ -364,34 +370,21 @@ func startVersionProcess() error { // newOperatorConfigAndDeps creates operator config & dependencies. func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, operator.Dependencies, error) { - kubecli, err := k8sutil.NewKubeClient() - if err != nil { - return operator.Config{}, operator.Dependencies{}, maskAny(err) + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + return operator.Config{}, operator.Dependencies{}, errors.Errorf("Failed to get client") } - kubeMonCli, err := k8sutil.NewKubeMonitoringV1Client() + image, serviceAccount, err := getMyPodInfo(client.Kubernetes(), namespace, name) if err != nil { - return operator.Config{}, operator.Dependencies{}, maskAny(err) + return operator.Config{}, operator.Dependencies{}, errors.WithStack(fmt.Errorf("Failed to get my pod's service account: %s", err)) } - image, serviceAccount, err := getMyPodInfo(kubecli, namespace, name) - if err != nil { - return operator.Config{}, operator.Dependencies{}, maskAny(fmt.Errorf("Failed to get my pod's service account: %s", err)) - } - - kubeExtCli, err := k8sutil.NewKubeExtClient() - if err != nil { - return operator.Config{}, operator.Dependencies{}, maskAny(fmt.Errorf("Failed to create k8b api extensions client: %s", err)) - } - crCli, err := client.NewClient() - if err != nil { - return operator.Config{}, operator.Dependencies{}, maskAny(fmt.Errorf("Failed to created versioned client: %s", err)) - } - eventRecorder := createRecorder(cliLog, kubecli, name, namespace) + eventRecorder := createRecorder(cliLog, client.Kubernetes(), name, namespace) scope, ok := scope.AsScope(operatorOptions.scope) if !ok { - return operator.Config{}, operator.Dependencies{}, maskAny(fmt.Errorf("Scope %s is not known by Operator", operatorOptions.scope)) + return operator.Config{}, operator.Dependencies{}, errors.WithStack(fmt.Errorf("Scope %s is not known by Operator", operatorOptions.scope)) } cfg := operator.Config{ @@ -414,10 +407,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper } deps := operator.Dependencies{ LogService: logService, - KubeCli: kubecli, - KubeExtCli: kubeExtCli, - KubeMonitoringCli: kubeMonCli, - CRCli: crCli, + Client: client, EventRecorder: eventRecorder, LivenessProbe: &livenessProbe, DeploymentProbe: &deploymentProbe, @@ -442,7 +432,7 @@ func getMyPodInfo(kubecli kubernetes.Interface, namespace, name string) (string, Err(err). Str("name", name). Msg("Failed to get operator pod") - return maskAny(err) + return errors.WithStack(err) } sa = pod.Spec.ServiceAccountName if image, err = k8sutil.GetArangoDBImageIDFromPod(pod); err != nil { @@ -455,7 +445,7 @@ func getMyPodInfo(kubecli kubernetes.Interface, namespace, name string) (string, return nil } if err := retry.Retry(op, time.Minute*5); err != nil { - return "", "", maskAny(err) + return "", "", errors.WithStack(err) } return image, sa, nil } diff --git a/cmd/reboot.go b/cmd/reboot.go index 3664bbb70..79d158b11 100644 --- a/cmd/reboot.go +++ b/cmd/reboot.go @@ -34,11 +34,11 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" deplv1 "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" - extclient "github.com/arangodb/kube-arangodb/pkg/client" acli "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/constants" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/pkg/errors" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" @@ -381,15 +381,14 @@ func cmdRebootRun(cmd *cobra.Command, args []string) { podname := os.Getenv(constants.EnvOperatorPodName) // Create kubernetes client - kubecli, err := k8sutil.NewKubeClient() - if err != nil { - cliLog.Fatal().Err(err).Msg("Failed to create Kubernetes client") + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + cliLog.Fatal().Msg("Failed to get client") } - extcli, err := extclient.NewClient() - if err != nil { - cliLog.Fatal().Err(err).Msg("failed to create arango extension client") - } + kubecli := client.Kubernetes() + + extcli := client.Arango() image, err := getMyImage(kubecli, namespace, podname) if err != nil { diff --git a/go.mod b/go.mod index fdf7ddb3f..268ca89a4 100644 --- a/go.mod +++ b/go.mod @@ -55,6 +55,8 @@ require ( k8s.io/klog v1.0.0 ) +require golang.org/x/time v0.0.0-20191024005414-555d28b269f0 + require ( github.com/arangodb/go-velocypack v0.0.0-20200318135517-5af53c29c67e // indirect github.com/beorn7/perks v1.0.1 // indirect @@ -97,7 +99,6 @@ require ( golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 // indirect golang.org/x/text v0.3.6 // indirect - golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/protobuf v1.27.1 // indirect diff --git a/pkg/client/client.go b/pkg/client/client.go deleted file mode 100644 index 7df48a93e..000000000 --- a/pkg/client/client.go +++ /dev/null @@ -1,74 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package client - -import ( - "github.com/arangodb/kube-arangodb/pkg/util/errors" - - "k8s.io/client-go/rest" - - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" - "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" -) - -// MustNewClient creates an client, or panics -// when a failure is detected. -func MustNewClient() versioned.Interface { - cli, err := NewClient() - if err != nil { - panic(err) - } - return cli -} - -// MustNew creates a client with given config, or panics -// when a failure is detected. -func MustNew(cfg *rest.Config) versioned.Interface { - cli, err := New(cfg) - if err != nil { - panic(err) - } - return cli -} - -// NewClient creates an client, or returns an error -// when a failure is detected. -func NewClient() (versioned.Interface, error) { - cfg, err := k8sutil.NewKubeConfig() - if err != nil { - return nil, errors.WithStack(err) - } - cli, err := New(cfg) - if err != nil { - return nil, errors.WithStack(err) - } - return cli, nil -} - -// New creates a client with given config, or returns an error -// when a failure is detected. -func New(cfg *rest.Config) (versioned.Interface, error) { - cli, err := versioned.NewForConfig(cfg) - if err != nil { - return nil, errors.WithStack(err) - } - return cli, nil -} diff --git a/pkg/deployment/access_package.go b/pkg/deployment/access_package.go index 4d23342d0..8fd85c652 100644 --- a/pkg/deployment/access_package.go +++ b/pkg/deployment/access_package.go @@ -49,7 +49,7 @@ const ( func (d *Deployment) createAccessPackages(ctx context.Context) error { log := d.deps.Log spec := d.apiObject.Spec - secrets := d.deps.KubeCli.CoreV1().Secrets(d.GetNamespace()) + secrets := d.deps.Client.Kubernetes().CoreV1().Secrets(d.GetNamespace()) if !spec.Sync.IsEnabled() { // We're only relevant when sync is enabled @@ -106,7 +106,7 @@ func (d *Deployment) createAccessPackages(ctx context.Context) error { func (d *Deployment) ensureAccessPackage(ctx context.Context, apSecretName string) error { log := d.deps.Log ns := d.GetNamespace() - secrets := d.deps.KubeCli.CoreV1().Secrets(ns) + secrets := d.deps.Client.Kubernetes().CoreV1().Secrets(ns) spec := d.apiObject.Spec err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { diff --git a/pkg/deployment/cluster_scaling_integration.go b/pkg/deployment/cluster_scaling_integration.go index 6b02682bf..440225cfd 100644 --- a/pkg/deployment/cluster_scaling_integration.go +++ b/pkg/deployment/cluster_scaling_integration.go @@ -205,7 +205,7 @@ func (ci *clusterScalingIntegration) inspectCluster(ctx context.Context, expectS apiObject := ci.depl.apiObject ctxChild, cancel = globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - current, err := ci.depl.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(apiObject.Namespace).Get(ctxChild, apiObject.Name, metav1.GetOptions{}) + current, err := ci.depl.deps.Client.Arango().DatabaseV1().ArangoDeployments(apiObject.Namespace).Get(ctxChild, apiObject.Name, metav1.GetOptions{}) if err != nil { return errors.WithStack(err) } diff --git a/pkg/deployment/context_impl.go b/pkg/deployment/context_impl.go index 8730160ac..ffddfbc75 100644 --- a/pkg/deployment/context_impl.go +++ b/pkg/deployment/context_impl.go @@ -45,7 +45,6 @@ import ( "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/secret" @@ -55,8 +54,6 @@ import ( "github.com/arangodb/kube-arangodb/pkg/operator/scope" - monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" - "github.com/arangodb/kube-arangodb/pkg/deployment/features" "github.com/arangodb/go-driver/http" @@ -70,16 +67,15 @@ import ( "github.com/arangodb/arangosync-client/tasks" driver "github.com/arangodb/go-driver" "github.com/arangodb/go-driver/agency" - "github.com/rs/zerolog/log" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" "github.com/arangodb/kube-arangodb/pkg/deployment/reconciler" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/rs/zerolog/log" core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) var _ resources.Context = &Deployment{} @@ -89,7 +85,7 @@ func (d *Deployment) GetBackup(ctx context.Context, backup string) (*backupApi.A ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - return d.deps.DatabaseCRCli.BackupV1().ArangoBackups(d.Namespace()).Get(ctxChild, backup, meta.GetOptions{}) + return d.deps.Client.Arango().BackupV1().ArangoBackups(d.Namespace()).Get(ctxChild, backup, meta.GetOptions{}) } // GetAPIObject returns the deployment as k8s object. @@ -102,18 +98,6 @@ func (d *Deployment) GetServerGroupIterator() reconciler.ServerGroupIterator { return d.apiObject } -func (d *Deployment) getKubeCli() kubernetes.Interface { - return d.deps.KubeCli -} - -func (d *Deployment) getMonitoringV1Cli() monitoringClient.MonitoringV1Interface { - return d.deps.KubeMonitoringCli -} - -func (d *Deployment) getArangoCli() versioned.Interface { - return d.deps.DatabaseCRCli -} - func (d *Deployment) GetScope() scope.Scope { return d.config.Scope } @@ -644,35 +628,35 @@ func (d *Deployment) WithStatusUpdate(ctx context.Context, action reconciler.Dep } func (d *Deployment) SecretsModInterface() secret.ModInterface { - return d.getKubeCli().CoreV1().Secrets(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).Secrets() } func (d *Deployment) PodsModInterface() podMod.ModInterface { - return d.getKubeCli().CoreV1().Pods(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).Pods() } func (d *Deployment) ServiceAccountsModInterface() serviceaccount.ModInterface { - return d.getKubeCli().CoreV1().ServiceAccounts(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).ServiceAccounts() } func (d *Deployment) ServicesModInterface() service.ModInterface { - return d.getKubeCli().CoreV1().Services(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).Services() } func (d *Deployment) PersistentVolumeClaimsModInterface() persistentvolumeclaim.ModInterface { - return d.getKubeCli().CoreV1().PersistentVolumeClaims(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).PersistentVolumeClaims() } func (d *Deployment) PodDisruptionBudgetsModInterface() poddisruptionbudget.ModInterface { - return d.getKubeCli().PolicyV1beta1().PodDisruptionBudgets(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).PodDisruptionBudgets() } func (d *Deployment) ServiceMonitorsModInterface() servicemonitor.ModInterface { - return d.getMonitoringV1Cli().ServiceMonitors(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).ServiceMonitors() } func (d *Deployment) ArangoMembersModInterface() arangomember.ModInterface { - return d.getArangoCli().DatabaseV1().ArangoMembers(d.GetNamespace()) + return kclient.NewModInterface(d.deps.Client, d.namespace).ArangoMembers() } func (d *Deployment) GetName() string { @@ -708,13 +692,13 @@ func (d *Deployment) SetCachedStatus(i inspectorInterface.Inspector) { } func (d *Deployment) WithArangoMemberUpdate(ctx context.Context, namespace, name string, action reconciler.ArangoMemberUpdateFunc) error { - o, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).Get(ctx, name, meta.GetOptions{}) + o, err := d.deps.Client.Arango().DatabaseV1().ArangoMembers(namespace).Get(ctx, name, meta.GetOptions{}) if err != nil { return err } if action(o) { - if _, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).Update(ctx, o, meta.UpdateOptions{}); err != nil { + if _, err := d.deps.Client.Arango().DatabaseV1().ArangoMembers(namespace).Update(ctx, o, meta.UpdateOptions{}); err != nil { return err } } @@ -723,7 +707,7 @@ func (d *Deployment) WithArangoMemberUpdate(ctx context.Context, namespace, name } func (d *Deployment) WithArangoMemberStatusUpdate(ctx context.Context, namespace, name string, action reconciler.ArangoMemberStatusUpdateFunc) error { - o, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).Get(ctx, name, meta.GetOptions{}) + o, err := d.deps.Client.Arango().DatabaseV1().ArangoMembers(namespace).Get(ctx, name, meta.GetOptions{}) if err != nil { return err } @@ -732,7 +716,7 @@ func (d *Deployment) WithArangoMemberStatusUpdate(ctx context.Context, namespace if action(o, status) { o.Status = *status - if _, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoMembers(namespace).UpdateStatus(ctx, o, meta.UpdateOptions{}); err != nil { + if _, err := d.deps.Client.Arango().DatabaseV1().ArangoMembers(namespace).UpdateStatus(ctx, o, meta.UpdateOptions{}); err != nil { return err } } @@ -748,7 +732,7 @@ func (d *Deployment) ApplyPatchOnPod(ctx context.Context, pod *core.Pod, p ...pa return err } - c := d.deps.KubeCli.CoreV1().Pods(pod.GetNamespace()) + c := d.deps.Client.Kubernetes().CoreV1().Pods(pod.GetNamespace()) ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() diff --git a/pkg/deployment/deployment.go b/pkg/deployment/deployment.go index e38293d5a..b65883661 100644 --- a/pkg/deployment/deployment.go +++ b/pkg/deployment/deployment.go @@ -41,8 +41,6 @@ import ( "github.com/arangodb/kube-arangodb/pkg/operator/scope" - monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" - "github.com/arangodb/kube-arangodb/pkg/util/arangod/conn" "github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector" @@ -51,9 +49,7 @@ import ( "github.com/arangodb/arangosync-client/client" "github.com/rs/zerolog" - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" @@ -62,9 +58,9 @@ import ( "github.com/arangodb/kube-arangodb/pkg/deployment/reconcile" "github.com/arangodb/kube-arangodb/pkg/deployment/resilience" "github.com/arangodb/kube-arangodb/pkg/deployment/resources" - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/trigger" ) @@ -80,12 +76,10 @@ type Config struct { // Dependencies holds dependent services for a Deployment type Dependencies struct { - Log zerolog.Logger - KubeCli kubernetes.Interface - KubeExtCli apiextensionsclient.Interface - KubeMonitoringCli monitoringClient.MonitoringV1Interface - DatabaseCRCli versioned.Interface - EventRecorder record.EventRecorder + Log zerolog.Logger + EventRecorder record.EventRecorder + + Client kclient.Client } // deploymentEventType strongly typed type of event @@ -333,7 +327,7 @@ func (d *Deployment) run() { for { select { case <-d.stopCh: - cachedStatus, err := inspector.NewInspector(context.Background(), d.getKubeCli(), d.getMonitoringV1Cli(), d.getArangoCli(), d.GetNamespace()) + cachedStatus, err := inspector.NewInspector(context.Background(), d.deps.Client, d.GetNamespace()) if err != nil { log.Error().Err(err).Msg("Unable to get resources") } @@ -386,7 +380,8 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(ctx context.Context) err // Get the most recent version of the deployment from the API server ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - current, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), meta.GetOptions{}) + + current, err := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()).Get(ctxChild, d.apiObject.GetName(), meta.GetOptions{}) if err != nil { log.Debug().Err(err).Msg("Failed to get current version of deployment from API server") if k8sutil.IsNotFound(err) { @@ -470,7 +465,7 @@ func (d *Deployment) updateCRStatus(ctx context.Context, force ...bool) error { } // Send update to API server - depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.GetNamespace()) + depls := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(d.GetNamespace()) attempt := 0 for { attempt++ @@ -532,7 +527,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe var newAPIObject *api.ArangoDeployment err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { var err error - newAPIObject, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, meta.UpdateOptions{}) + newAPIObject, err = d.deps.Client.Arango().DatabaseV1().ArangoDeployments(ns).Update(ctxChild, update, meta.UpdateOptions{}) return err }) @@ -548,7 +543,7 @@ func (d *Deployment) updateCRSpec(ctx context.Context, newSpec api.DeploymentSpe err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error { var err error - current, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), meta.GetOptions{}) + current, err = d.deps.Client.Arango().DatabaseV1().ArangoDeployments(ns).Get(ctxChild, update.GetName(), meta.GetOptions{}) return err }) @@ -580,9 +575,9 @@ func (d *Deployment) isOwnerOf(obj meta.Object) bool { func (d *Deployment) lookForServiceMonitorCRD() { var err error if d.GetScope().IsNamespaced() { - _, err = d.deps.KubeMonitoringCli.ServiceMonitors(d.GetNamespace()).List(context.Background(), meta.ListOptions{}) + _, err = d.deps.Client.Monitoring().MonitoringV1().ServiceMonitors(d.GetNamespace()).List(context.Background(), meta.ListOptions{}) } else { - _, err = d.deps.KubeExtCli.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", meta.GetOptions{}) + _, err = d.deps.Client.KubernetesExtensions().ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "servicemonitors.monitoring.coreos.com", meta.GetOptions{}) } log := d.deps.Log log.Debug().Msgf("Looking for ServiceMonitor CRD...") @@ -630,7 +625,7 @@ func (d *Deployment) ApplyPatch(ctx context.Context, p ...patch.Item) error { return err } - c := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()) + c := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(d.apiObject.GetNamespace()) ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() diff --git a/pkg/deployment/deployment_finalizers.go b/pkg/deployment/deployment_finalizers.go index 28b1c8b02..bdff47b7b 100644 --- a/pkg/deployment/deployment_finalizers.go +++ b/pkg/deployment/deployment_finalizers.go @@ -57,7 +57,7 @@ func (d *Deployment) runDeploymentFinalizers(ctx context.Context, cachedStatus i log := d.deps.Log var removalList []string - depls := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.GetNamespace()) + depls := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(d.GetNamespace()) ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() updated, err := depls.Get(ctxChild, d.apiObject.GetName(), metav1.GetOptions{}) @@ -79,7 +79,7 @@ func (d *Deployment) runDeploymentFinalizers(ctx context.Context, cachedStatus i } // Remove finalizers (if needed) if len(removalList) > 0 { - if err := removeDeploymentFinalizers(ctx, log, d.deps.DatabaseCRCli, updated, removalList); err != nil { + if err := removeDeploymentFinalizers(ctx, log, d.deps.Client.Arango(), updated, removalList); err != nil { log.Debug().Err(err).Msg("Failed to update ArangoDeployment (to remove finalizers)") return errors.WithStack(err) } diff --git a/pkg/deployment/deployment_inspector.go b/pkg/deployment/deployment_inspector.go index 437b66c32..cafb9a603 100644 --- a/pkg/deployment/deployment_inspector.go +++ b/pkg/deployment/deployment_inspector.go @@ -74,7 +74,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval deploymentName := d.GetName() defer metrics.SetDuration(inspectDeploymentDurationGauges.WithLabelValues(deploymentName), start) - cachedStatus, err := inspector.NewInspector(context.Background(), d.getKubeCli(), d.getMonitoringV1Cli(), d.getArangoCli(), d.GetNamespace()) + cachedStatus, err := inspector.NewInspector(context.Background(), d.deps.Client, d.GetNamespace()) if err != nil { log.Error().Err(err).Msg("Unable to get resources") return minInspectionInterval // Retry ASAP @@ -84,7 +84,7 @@ func (d *Deployment) inspectDeployment(lastInterval util.Interval) util.Interval var updated *api.ArangoDeployment err = globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctxReconciliation, func(ctxChild context.Context) error { var err error - updated, err = d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(d.GetNamespace()).Get(ctxChild, deploymentName, metav1.GetOptions{}) + updated, err = d.deps.Client.Arango().DatabaseV1().ArangoDeployments(d.GetNamespace()).Get(ctxChild, deploymentName, metav1.GetOptions{}) return err }) if k8sutil.IsNotFound(err) { diff --git a/pkg/deployment/deployment_run_test.go b/pkg/deployment/deployment_run_test.go index 81596d1ae..36c269e21 100644 --- a/pkg/deployment/deployment_run_test.go +++ b/pkg/deployment/deployment_run_test.go @@ -108,7 +108,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) { } // Create custom resource in the fake kubernetes API - _, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(testNamespace).Create(context.Background(), d.apiObject, metav1.CreateOptions{}) + _, err := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(testNamespace).Create(context.Background(), d.apiObject, metav1.CreateOptions{}) require.NoError(t, err) if testCase.Resources != nil { @@ -172,7 +172,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) { return err } - cache, err := inspector.NewInspector(context.Background(), d.getKubeCli(), d.getMonitoringV1Cli(), d.getArangoCli(), d.GetNamespace()) + cache, err := inspector.NewInspector(context.Background(), d.deps.Client, d.GetNamespace()) require.NoError(t, err) groupSpec := d.apiObject.Spec.GetServerGroupSpec(group) @@ -225,7 +225,7 @@ func runTestCase(t *testing.T, testCase testCaseStruct) { } require.NoError(t, err) - pods, err := d.deps.KubeCli.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{}) + pods, err := d.deps.Client.Kubernetes().CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) require.Len(t, pods.Items, 1) if util.BoolOrDefault(testCase.CompareChecksum, true) { diff --git a/pkg/deployment/deployment_suite_test.go b/pkg/deployment/deployment_suite_test.go index d4b824b56..ff261e43a 100644 --- a/pkg/deployment/deployment_suite_test.go +++ b/pkg/deployment/deployment_suite_test.go @@ -50,6 +50,8 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/constants" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/probes" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + extfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" ) const ( @@ -450,7 +452,9 @@ func createTestDeployment(t *testing.T, config Config, arangoDeployment *api.Ara eventRecorder := recordfake.NewFakeRecorder(10) kubernetesClientSet := fake.NewSimpleClientset() + kubernetesExtClientSet := extfake.NewSimpleClientset() monitoringClientSet := monitoringFakeClient.NewSimpleClientset() + arangoClientSet := arangofake.NewSimpleClientset() arangoDeployment.ObjectMeta = metav1.ObjectMeta{ Name: testDeploymentName, @@ -469,11 +473,9 @@ func createTestDeployment(t *testing.T, config Config, arangoDeployment *api.Ara arangoDeployment.Status.CurrentImage = &arangoDeployment.Status.Images[0] deps := Dependencies{ - Log: zerolog.New(ioutil.Discard), - KubeCli: kubernetesClientSet, - KubeMonitoringCli: monitoringClientSet.MonitoringV1(), - DatabaseCRCli: arangofake.NewSimpleClientset(&api.ArangoDeployment{}), - EventRecorder: eventRecorder, + Log: zerolog.New(ioutil.Discard), + EventRecorder: eventRecorder, + Client: kclient.NewStaticClient(kubernetesClientSet, kubernetesExtClientSet, arangoClientSet, monitoringClientSet), } d := &Deployment{ @@ -487,7 +489,7 @@ func createTestDeployment(t *testing.T, config Config, arangoDeployment *api.Ara } d.clientCache = client.NewClientCache(d, conn.NewFactory(d.getAuth, d.getConnConfig)) - cachedStatus, err := inspector.NewInspector(context.Background(), d.getKubeCli(), d.getMonitoringV1Cli(), d.getArangoCli(), d.GetNamespace()) + cachedStatus, err := inspector.NewInspector(context.Background(), deps.Client, d.GetNamespace()) require.NoError(t, err) assert.NotEmpty(t, cachedStatus.GetVersionInfo(), "API server should not have returned empty version") d.SetCachedStatus(cachedStatus) diff --git a/pkg/deployment/images_test.go b/pkg/deployment/images_test.go index f3c6c66bc..c0fe85b38 100644 --- a/pkg/deployment/images_test.go +++ b/pkg/deployment/images_test.go @@ -323,7 +323,7 @@ func TestEnsureImages(t *testing.T) { } // Create custom resource in the fake kubernetes API - _, err := d.deps.DatabaseCRCli.DatabaseV1().ArangoDeployments(testNamespace).Create(context.Background(), d.apiObject, metav1.CreateOptions{}) + _, err := d.deps.Client.Arango().DatabaseV1().ArangoDeployments(testNamespace).Create(context.Background(), d.apiObject, metav1.CreateOptions{}) require.NoError(t, err) // Act @@ -339,7 +339,7 @@ func TestEnsureImages(t *testing.T) { require.NoError(t, err) if len(testCase.ExpectedPod.Spec.Containers) > 0 { - pods, err := d.deps.KubeCli.CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{}) + pods, err := d.deps.Client.Kubernetes().CoreV1().Pods(testNamespace).List(context.Background(), metav1.ListOptions{}) require.NoError(t, err) require.Len(t, pods.Items, 1) require.Equal(t, testCase.ExpectedPod.Spec, pods.Items[0].Spec) diff --git a/pkg/deployment/informers.go b/pkg/deployment/informers.go index 3ea1b948a..be8101cbd 100644 --- a/pkg/deployment/informers.go +++ b/pkg/deployment/informers.go @@ -45,7 +45,7 @@ func (d *Deployment) listenForPodEvents(stopCh <-chan struct{}) { rw := k8sutil.NewResourceWatcher( d.deps.Log, - d.deps.KubeCli.CoreV1().RESTClient(), + d.deps.Client.Kubernetes().CoreV1().RESTClient(), "pods", d.apiObject.GetNamespace(), &v1.Pod{}, @@ -87,7 +87,7 @@ func (d *Deployment) listenForPVCEvents(stopCh <-chan struct{}) { rw := k8sutil.NewResourceWatcher( d.deps.Log, - d.deps.KubeCli.CoreV1().RESTClient(), + d.deps.Client.Kubernetes().CoreV1().RESTClient(), "persistentvolumeclaims", d.apiObject.GetNamespace(), &v1.PersistentVolumeClaim{}, @@ -129,7 +129,7 @@ func (d *Deployment) listenForSecretEvents(stopCh <-chan struct{}) { rw := k8sutil.NewResourceWatcher( d.deps.Log, - d.deps.KubeCli.CoreV1().RESTClient(), + d.deps.Client.Kubernetes().CoreV1().RESTClient(), "secrets", d.apiObject.GetNamespace(), &v1.Secret{}, @@ -172,7 +172,7 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) { rw := k8sutil.NewResourceWatcher( d.deps.Log, - d.deps.KubeCli.CoreV1().RESTClient(), + d.deps.Client.Kubernetes().CoreV1().RESTClient(), "services", d.apiObject.GetNamespace(), &v1.Service{}, @@ -201,7 +201,7 @@ func (d *Deployment) listenForServiceEvents(stopCh <-chan struct{}) { func (d *Deployment) listenForCRDEvents(stopCh <-chan struct{}) { rw := k8sutil.NewResourceWatcher( d.deps.Log, - d.deps.KubeExtCli.ApiextensionsV1().RESTClient(), + d.deps.Client.KubernetesExtensions().ApiextensionsV1().RESTClient(), "customresourcedefinitions", "", &crdv1.CustomResourceDefinition{}, diff --git a/pkg/deployment/resources/inspector/inspector.go b/pkg/deployment/resources/inspector/inspector.go index 4c9db7b90..9f3c516fe 100644 --- a/pkg/deployment/resources/inspector/inspector.go +++ b/pkg/deployment/resources/inspector/inspector.go @@ -24,19 +24,16 @@ import ( "context" "sync" - monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" - core "k8s.io/api/core/v1" - policy "k8s.io/api/policy/v1beta1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" - "github.com/arangodb/go-driver" api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1" - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util/errors" inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" + core "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) // SecretReadInterface has methods to work with Secret resources with ReadOnly mode. @@ -44,12 +41,10 @@ type SecretReadInterface interface { Get(ctx context.Context, name string, opts meta.GetOptions) (*core.Secret, error) } -func NewInspector(ctx context.Context, k kubernetes.Interface, m monitoringClient.MonitoringV1Interface, c versioned.Interface, namespace string) (inspectorInterface.Inspector, error) { +func NewInspector(ctx context.Context, client kclient.Client, namespace string) (inspectorInterface.Inspector, error) { i := &inspector{ namespace: namespace, - k: k, - m: m, - c: c, + client: client, } if err := i.Refresh(ctx); err != nil { @@ -59,26 +54,24 @@ func NewInspector(ctx context.Context, k kubernetes.Interface, m monitoringClien return i, nil } -func newInspector(ctx context.Context, k kubernetes.Interface, m monitoringClient.MonitoringV1Interface, c versioned.Interface, namespace string) (*inspector, error) { +func newInspector(ctx context.Context, client kclient.Client, namespace string) (*inspector, error) { var i inspector i.namespace = namespace - i.k = k - i.m = m - i.c = c + i.client = client if err := util.RunParallel(15, - getVersionInfo(ctx, &i, k, namespace), - podsToMap(ctx, &i, k, namespace), - secretsToMap(ctx, &i, k, namespace), - pvcsToMap(ctx, &i, k, namespace), - servicesToMap(ctx, &i, k, namespace), - serviceAccountsToMap(ctx, &i, k, namespace), - podDisruptionBudgetsToMap(ctx, &i, k, namespace), - serviceMonitorsToMap(ctx, &i, m, namespace), - arangoMembersToMap(ctx, &i, c, namespace), - nodesToMap(ctx, &i, k), - arangoClusterSynchronizationsToMap(ctx, &i, c, namespace), + getVersionInfo(ctx, &i, client.Kubernetes(), namespace), + podsToMap(ctx, &i, client.Kubernetes(), namespace), + secretsToMap(ctx, &i, client.Kubernetes(), namespace), + pvcsToMap(ctx, &i, client.Kubernetes(), namespace), + servicesToMap(ctx, &i, client.Kubernetes(), namespace), + serviceAccountsToMap(ctx, &i, client.Kubernetes(), namespace), + podDisruptionBudgetsToMap(ctx, &i, client.Kubernetes(), namespace), + serviceMonitorsToMap(ctx, &i, client.Monitoring(), namespace), + arangoMembersToMap(ctx, &i, client.Arango(), namespace), + nodesToMap(ctx, &i, client.Kubernetes()), + arangoClusterSynchronizationsToMap(ctx, &i, client.Arango(), namespace), ); err != nil { return nil, err } @@ -145,9 +138,7 @@ type inspector struct { namespace string - k kubernetes.Interface - m monitoringClient.MonitoringV1Interface - c versioned.Interface + client kclient.Client pods map[string]*core.Pod secrets map[string]*core.Secret @@ -174,7 +165,7 @@ func (i *inspector) Refresh(ctx context.Context) error { return errors.New("Inspector created from static data") } - new, err := newInspector(ctx, i.k, i.m, i.c, i.namespace) + new, err := newInspector(ctx, i.client, i.namespace) if err != nil { return err } diff --git a/pkg/deployment/resources/inspector/sms.go b/pkg/deployment/resources/inspector/sms.go index 66f8e05c1..f61a2db41 100644 --- a/pkg/deployment/resources/inspector/sms.go +++ b/pkg/deployment/resources/inspector/sms.go @@ -32,7 +32,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor" monitoringGroup "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring" monitoring "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" + monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -98,7 +98,7 @@ func (s serviceMonitorReadInterface) Get(ctx context.Context, name string, opts } } -func serviceMonitorsToMap(ctx context.Context, inspector *inspector, m monitoringClient.MonitoringV1Interface, namespace string) func() error { +func serviceMonitorsToMap(ctx context.Context, inspector *inspector, m monitoringClient.Interface, namespace string) func() error { return func() error { serviceMonitors := getServiceMonitors(ctx, m, namespace, "") @@ -119,10 +119,10 @@ func serviceMonitorsToMap(ctx context.Context, inspector *inspector, m monitorin } } -func getServiceMonitors(ctx context.Context, m monitoringClient.MonitoringV1Interface, namespace, cont string) []*monitoring.ServiceMonitor { +func getServiceMonitors(ctx context.Context, m monitoringClient.Interface, namespace, cont string) []*monitoring.ServiceMonitor { ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() - serviceMonitors, err := m.ServiceMonitors(namespace).List(ctxChild, meta.ListOptions{ + serviceMonitors, err := m.MonitoringV1().ServiceMonitors(namespace).List(ctxChild, meta.ListOptions{ Limit: globals.GetGlobals().Kubernetes().RequestBatchSize().Get(), Continue: cont, }) diff --git a/pkg/deployment/resources/resources.go b/pkg/deployment/resources/resources.go index 2b1d6471b..9827999f0 100644 --- a/pkg/deployment/resources/resources.go +++ b/pkg/deployment/resources/resources.go @@ -26,7 +26,6 @@ import ( driver "github.com/arangodb/go-driver" "github.com/arangodb/kube-arangodb/pkg/util/trigger" - clientv1 "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" "github.com/rs/zerolog" ) @@ -46,7 +45,6 @@ type Resources struct { mutex sync.Mutex triggerSyncInspection trigger.Trigger } - monitoringClient *clientv1.MonitoringV1Client } // NewResources creates a new Resources service, used to diff --git a/pkg/deployment/resources/servicemonitor.go b/pkg/deployment/resources/servicemonitor.go index 6455d3ab2..ac91c809f 100644 --- a/pkg/deployment/resources/servicemonitor.go +++ b/pkg/deployment/resources/servicemonitor.go @@ -34,10 +34,9 @@ import ( apiErrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" coreosv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1" - clientv1 "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/rest" ) func LabelsForExporterServiceMonitor(name string, obj deploymentApi.DeploymentSpec) map[string]string { @@ -60,27 +59,6 @@ func LabelsForExporterServiceMonitorSelector(name string) map[string]string { } } -// EnsureMonitoringClient returns a client for looking at ServiceMonitors -// and keeps it in the Resources. -func (r *Resources) EnsureMonitoringClient() (*clientv1.MonitoringV1Client, error) { - if r.monitoringClient != nil { - return r.monitoringClient, nil - } - - // Make a client: - var restConfig *rest.Config - restConfig, err := k8sutil.NewKubeConfig() - if err != nil { - return nil, errors.WithStack(err) - } - mClient, err := clientv1.NewForConfig(restConfig) - if err != nil { - return nil, errors.WithStack(err) - } - r.monitoringClient = mClient - return mClient, nil -} - func (r *Resources) makeEndpoint(isSecure bool) coreosv1.Endpoint { if isSecure { return coreosv1.Endpoint{ @@ -158,14 +136,16 @@ func (r *Resources) EnsureServiceMonitor(ctx context.Context) error { wantMetrics := spec.Metrics.IsEnabled() serviceMonitorName := k8sutil.CreateExporterClientServiceName(deploymentName) - mClient, err := r.EnsureMonitoringClient() - if err != nil { - log.Error().Err(err).Msgf("Cannot get a monitoring client.") - return errors.WithStack(err) + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + log.Error().Msgf("Cannot get a monitoring client.") + return errors.Newf("Client not initialised") } + mClient := client.Monitoring() + // Check if ServiceMonitor already exists - serviceMonitors := mClient.ServiceMonitors(ns) + serviceMonitors := mClient.MonitoringV1().ServiceMonitors(ns) ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx) defer cancel() servMon, err := serviceMonitors.Get(ctxChild, serviceMonitorName, metav1.GetOptions{}) diff --git a/pkg/deployment/server_api.go b/pkg/deployment/server_api.go index 11eb701d4..44dd02e76 100644 --- a/pkg/deployment/server_api.go +++ b/pkg/deployment/server_api.go @@ -191,7 +191,7 @@ func (d *Deployment) StorageClasses() []string { func (d *Deployment) DatabaseURL() string { eaSvcName := k8sutil.CreateDatabaseExternalAccessServiceName(d.Name()) ns := d.apiObject.Namespace - svc, err := d.deps.KubeCli.CoreV1().Services(ns).Get(context.Background(), eaSvcName, metav1.GetOptions{}) + svc, err := d.deps.Client.Kubernetes().CoreV1().Services(ns).Get(context.Background(), eaSvcName, metav1.GetOptions{}) if err != nil { return "" } @@ -200,7 +200,7 @@ func (d *Deployment) DatabaseURL() string { scheme = "http" } nodeFetcher := func() (v1.NodeList, error) { - result, err := d.deps.KubeCli.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + result, err := d.deps.Client.Kubernetes().CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) if err != nil { return v1.NodeList{}, errors.WithStack(err) } diff --git a/pkg/deployment/server_member_api.go b/pkg/deployment/server_member_api.go index cbb4036fd..ece28b44f 100644 --- a/pkg/deployment/server_member_api.go +++ b/pkg/deployment/server_member_api.go @@ -61,7 +61,7 @@ func (m member) PVCName() string { func (m member) PVName() string { if status, found := m.status(); found && status.PersistentVolumeClaimName != "" { - pvcs := m.d.deps.KubeCli.CoreV1().PersistentVolumeClaims(m.d.Namespace()) + pvcs := m.d.deps.Client.Kubernetes().CoreV1().PersistentVolumeClaims(m.d.Namespace()) if pvc, err := pvcs.Get(context.Background(), status.PersistentVolumeClaimName, metav1.GetOptions{}); err == nil { return pvc.Spec.VolumeName } diff --git a/pkg/operator/crd.go b/pkg/operator/crd.go index 51a6dbc48..c26e363a4 100644 --- a/pkg/operator/crd.go +++ b/pkg/operator/crd.go @@ -38,7 +38,7 @@ func (o *Operator) waitForCRD(crdName string, checkFn func() error) { err = crd.WaitReady(checkFn) } } else { - err = crd.WaitCRDReady(o.KubeExtCli, crdName) + err = crd.WaitCRDReady(o.Client.KubernetesExtensions(), crdName) } if err == nil { diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 8cad420f2..cf491bb6b 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -26,10 +26,8 @@ import ( "math/rand" "time" - monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" "github.com/prometheus/client_golang/prometheus" "github.com/rs/zerolog" - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" meta "k8s.io/apimachinery/pkg/apis/meta/v1" kwatch "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -44,7 +42,6 @@ import ( replapi "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1" lsapi "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha" "github.com/arangodb/kube-arangodb/pkg/deployment" - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" arangoInformer "github.com/arangodb/kube-arangodb/pkg/generated/informers/externalversions" "github.com/arangodb/kube-arangodb/pkg/handlers/backup" @@ -58,6 +55,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/replication" "github.com/arangodb/kube-arangodb/pkg/storage" "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/probe" ) @@ -111,10 +109,7 @@ type Config struct { type Dependencies struct { LogService logging.Service - KubeCli kubernetes.Interface - KubeExtCli apiextensionsclient.Interface - KubeMonitoringCli monitoringClient.MonitoringV1Interface - CRCli versioned.Interface + Client kclient.Client EventRecorder record.EventRecorder LivenessProbe *probe.LivenessProbe DeploymentProbe *probe.ReadyProbe @@ -189,7 +184,7 @@ func (o *Operator) Run() { // onStartDeployment starts the deployment operator and run till given channel is closed. func (o *Operator) onStartDeployment(stop <-chan struct{}) { checkFn := func() error { - _, err := o.CRCli.DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{}) + _, err := o.Client.Arango().DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{}) return err } o.waitForCRD(depldef.ArangoDeploymentCRDName, checkFn) @@ -199,7 +194,7 @@ func (o *Operator) onStartDeployment(stop <-chan struct{}) { // onStartDeploymentReplication starts the deployment replication operator and run till given channel is closed. func (o *Operator) onStartDeploymentReplication(stop <-chan struct{}) { checkFn := func() error { - _, err := o.CRCli.DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{}) + _, err := o.Client.Arango().DatabaseV1().ArangoDeployments(o.Namespace).List(context.Background(), meta.ListOptions{}) return err } o.waitForCRD(repldef.ArangoDeploymentReplicationCRDName, checkFn) @@ -260,7 +255,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st switch operatorType { case appsOperator: checkFn := func() error { - _, err := o.CRCli.AppsV1().ArangoJobs(o.Namespace).List(context.Background(), meta.ListOptions{}) + _, err := o.Client.Arango().AppsV1().ArangoJobs(o.Namespace).List(context.Background(), meta.ListOptions{}) return err } o.waitForCRD(apps.ArangoJobCRDName, checkFn) @@ -270,7 +265,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st } case backupOperator: checkFn := func() error { - _, err := o.CRCli.BackupV1().ArangoBackups(o.Namespace).List(context.Background(), meta.ListOptions{}) + _, err := o.Client.Arango().BackupV1().ArangoBackups(o.Namespace).List(context.Background(), meta.ListOptions{}) return err } o.waitForCRD(backupdef.ArangoBackupCRDName, checkFn) @@ -280,7 +275,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st } checkFn = func() error { - _, err := o.CRCli.BackupV1().ArangoBackupPolicies(o.Namespace).List(context.Background(), meta.ListOptions{}) + _, err := o.Client.Arango().BackupV1().ArangoBackupPolicies(o.Namespace).List(context.Background(), meta.ListOptions{}) return err } o.waitForCRD(backupdef.ArangoBackupPolicyCRDName, checkFn) @@ -290,7 +285,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st } case k2KClusterSyncOperator: checkFn := func() error { - _, err := o.CRCli.DatabaseV1().ArangoClusterSynchronizations(o.Namespace).List(context.Background(), meta.ListOptions{}) + _, err := o.Client.Arango().DatabaseV1().ArangoClusterSynchronizations(o.Namespace).List(context.Background(), meta.ListOptions{}) return err } o.waitForCRD(depldef.ArangoClusterSynchronizationCRDName, checkFn) diff --git a/pkg/operator/operator_deployment.go b/pkg/operator/operator_deployment.go index edd2089bc..da61af658 100644 --- a/pkg/operator/operator_deployment.go +++ b/pkg/operator/operator_deployment.go @@ -47,7 +47,7 @@ var ( func (o *Operator) runDeployments(stop <-chan struct{}) { rw := k8sutil.NewResourceWatcher( o.log, - o.Dependencies.CRCli.DatabaseV1().RESTClient(), + o.Client.Arango().DatabaseV1().RESTClient(), deploymentType.ArangoDeploymentResourcePlural, o.Config.Namespace, &api.ArangoDeployment{}, @@ -212,11 +212,8 @@ func (o *Operator) makeDeploymentConfigAndDeps(apiObject *api.ArangoDeployment) Log: o.Dependencies.LogService.MustGetLogger(logging.LoggerNameDeployment).With(). Str("deployment", apiObject.GetName()). Logger(), - KubeCli: o.Dependencies.KubeCli, - KubeMonitoringCli: o.Dependencies.KubeMonitoringCli, - KubeExtCli: o.Dependencies.KubeExtCli, - DatabaseCRCli: o.Dependencies.CRCli, - EventRecorder: o.Dependencies.EventRecorder, + Client: o.Client, + EventRecorder: o.EventRecorder, } return cfg, deps } diff --git a/pkg/operator/operator_deployment_relication.go b/pkg/operator/operator_deployment_relication.go index 32abb4b50..7266a2ee1 100644 --- a/pkg/operator/operator_deployment_relication.go +++ b/pkg/operator/operator_deployment_relication.go @@ -47,7 +47,7 @@ var ( func (o *Operator) runDeploymentReplications(stop <-chan struct{}) { rw := k8sutil.NewResourceWatcher( o.log, - o.Dependencies.CRCli.ReplicationV1().RESTClient(), + o.Dependencies.Client.Arango().ReplicationV1().RESTClient(), replication2.ArangoDeploymentReplicationResourcePlural, o.Config.Namespace, &api.ArangoDeploymentReplication{}, @@ -207,8 +207,7 @@ func (o *Operator) makeDeploymentReplicationConfigAndDeps(apiObject *api.ArangoD Log: o.Dependencies.LogService.MustGetLogger(logging.LoggerNameDeploymentReplication).With(). Str("deployment-replication", apiObject.GetName()). Logger(), - KubeCli: o.Dependencies.KubeCli, - CRCli: o.Dependencies.CRCli, + Client: o.Client, EventRecorder: o.Dependencies.EventRecorder, } return cfg, deps diff --git a/pkg/operator/operator_leader.go b/pkg/operator/operator_leader.go index 029bca8f6..36827acdd 100644 --- a/pkg/operator/operator_leader.go +++ b/pkg/operator/operator_leader.go @@ -49,7 +49,7 @@ import ( // is detected. func (o *Operator) runLeaderElection(lockName, label string, onStart func(stop <-chan struct{}), readyProbe *probe.ReadyProbe) { namespace := o.Config.Namespace - kubecli := o.Dependencies.KubeCli + kubecli := o.Dependencies.Client.Kubernetes() log := o.log.With().Str("lock-name", lockName).Logger() eventTarget := o.getLeaderElectionEventTarget(log) recordEvent := func(reason, message string) { @@ -122,7 +122,7 @@ func (o *Operator) runWithoutLeaderElection(lockName, label string, onStart func // events will be added to. func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Object { ns := o.Config.Namespace - kubecli := o.Dependencies.KubeCli + kubecli := o.Dependencies.Client.Kubernetes() pods := kubecli.CoreV1().Pods(ns) log = log.With().Str("pod-name", o.Config.PodName).Logger() pod, err := pods.Get(context.Background(), o.Config.PodName, metav1.GetOptions{}) @@ -155,7 +155,7 @@ func (o *Operator) getLeaderElectionEventTarget(log zerolog.Logger) runtime.Obje // setRoleLabel sets a label with key `role` and given value in the pod metadata. func (o *Operator) setRoleLabel(log zerolog.Logger, label, role string) error { ns := o.Config.Namespace - kubecli := o.Dependencies.KubeCli + kubecli := o.Dependencies.Client.Kubernetes() pods := kubecli.CoreV1().Pods(ns) log = log.With().Str("pod-name", o.Config.PodName).Logger() op := func() error { diff --git a/pkg/operator/operator_local_storage.go b/pkg/operator/operator_local_storage.go index 424d50c7d..aee6c8479 100644 --- a/pkg/operator/operator_local_storage.go +++ b/pkg/operator/operator_local_storage.go @@ -45,7 +45,7 @@ var ( func (o *Operator) runLocalStorages(stop <-chan struct{}) { rw := k8sutil.NewResourceWatcher( o.log, - o.Dependencies.CRCli.StorageV1alpha().RESTClient(), + o.Dependencies.Client.Arango().StorageV1alpha().RESTClient(), api.ArangoLocalStorageResourcePlural, "", //o.Config.Namespace, &api.ArangoLocalStorage{}, @@ -207,8 +207,7 @@ func (o *Operator) makeLocalStorageConfigAndDeps(apiObject *api.ArangoLocalStora Log: o.Dependencies.LogService.MustGetLogger(logging.LoggerNameStorage).With(). Str("localStorage", apiObject.GetName()). Logger(), - KubeCli: o.Dependencies.KubeCli, - StorageCRCli: o.Dependencies.CRCli, + Client: o.Client, EventRecorder: o.Dependencies.EventRecorder, } return cfg, deps diff --git a/pkg/operator/server_discovery_api.go b/pkg/operator/server_discovery_api.go index 3bc3c5903..c11a04c6f 100644 --- a/pkg/operator/server_discovery_api.go +++ b/pkg/operator/server_discovery_api.go @@ -52,7 +52,7 @@ func (o *Operator) FindOtherOperators() []server.OperatorReference { log := o.log var result []server.OperatorReference - namespaces, err := o.Dependencies.KubeCli.CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) + namespaces, err := o.Dependencies.Client.Kubernetes().CoreV1().Namespaces().List(context.Background(), metav1.ListOptions{}) if err != nil { log.Warn().Err(err).Msg("Failed to list namespaces") } else { @@ -94,7 +94,7 @@ func (o *Operator) FindOtherOperators() []server.OperatorReference { func (o *Operator) findOtherOperatorsInNamespace(log zerolog.Logger, namespace string, typePred func(server.OperatorType) bool) []server.OperatorReference { log = log.With().Str("namespace", namespace).Logger() var result []server.OperatorReference - services, err := o.Dependencies.KubeCli.CoreV1().Services(namespace).List(context.Background(), metav1.ListOptions{}) + services, err := o.Dependencies.Client.Kubernetes().CoreV1().Services(namespace).List(context.Background(), metav1.ListOptions{}) if err != nil { log.Debug().Err(err).Msg("Failed to list services") return nil @@ -103,7 +103,7 @@ func (o *Operator) findOtherOperatorsInNamespace(log zerolog.Logger, namespace s if o.Scope.IsNamespaced() { return v1.NodeList{}, nil } - result, err := o.Dependencies.KubeCli.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) + result, err := o.Dependencies.Client.Kubernetes().CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) if err != nil { return v1.NodeList{}, errors.WithStack(err) } diff --git a/pkg/replication/deployment_replication.go b/pkg/replication/deployment_replication.go index 05ef85336..f255a7486 100644 --- a/pkg/replication/deployment_replication.go +++ b/pkg/replication/deployment_replication.go @@ -30,13 +30,12 @@ import ( "github.com/rs/zerolog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" "github.com/arangodb/arangosync-client/client" api "github.com/arangodb/kube-arangodb/pkg/apis/replication/v1" - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/retry" "github.com/arangodb/kube-arangodb/pkg/util/trigger" ) @@ -49,8 +48,7 @@ type Config struct { // Dependencies holds dependent services for a DeploymentReplication type Dependencies struct { Log zerolog.Logger - KubeCli kubernetes.Interface - CRCli versioned.Interface + Client kclient.Client EventRecorder record.EventRecorder } @@ -190,7 +188,7 @@ func (dr *DeploymentReplication) run() { // handleArangoDeploymentReplicationUpdatedEvent is called when the deployment replication is updated by the user. func (dr *DeploymentReplication) handleArangoDeploymentReplicationUpdatedEvent(event *deploymentReplicationEvent) error { log := dr.deps.Log.With().Str("deployoment-replication", event.DeploymentReplication.GetName()).Logger() - repls := dr.deps.CRCli.ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) + repls := dr.deps.Client.Arango().ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) // Get the most recent version of the deployment replication from the API server current, err := repls.Get(context.Background(), dr.apiObject.GetName(), metav1.GetOptions{}) @@ -251,7 +249,7 @@ func (dr *DeploymentReplication) updateCRStatus() error { // Send update to API server log := dr.deps.Log - repls := dr.deps.CRCli.ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) + repls := dr.deps.Client.Arango().ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) update := dr.apiObject.DeepCopy() attempt := 0 for { @@ -285,7 +283,7 @@ func (dr *DeploymentReplication) updateCRStatus() error { // On success, d.apiObject is updated. func (dr *DeploymentReplication) updateCRSpec(newSpec api.DeploymentReplicationSpec) error { log := dr.deps.Log - repls := dr.deps.CRCli.ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) + repls := dr.deps.Client.Arango().ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) // Send update to API server update := dr.apiObject.DeepCopy() @@ -330,7 +328,7 @@ func (dr *DeploymentReplication) failOnError(err error, msg string) { func (dr *DeploymentReplication) reportFailedStatus() { log := dr.deps.Log log.Info().Msg("local storage failed. Reporting failed reason...") - repls := dr.deps.CRCli.ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) + repls := dr.deps.Client.Arango().ReplicationV1().ArangoDeploymentReplications(dr.apiObject.GetNamespace()) op := func() error { dr.status.Phase = api.DeploymentReplicationPhaseFailed diff --git a/pkg/replication/finalizers.go b/pkg/replication/finalizers.go index 5c8733973..d4e586948 100644 --- a/pkg/replication/finalizers.go +++ b/pkg/replication/finalizers.go @@ -78,7 +78,7 @@ func (dr *DeploymentReplication) runFinalizers(ctx context.Context, p *api.Arang // Remove finalizers (if needed) if len(removalList) > 0 { ignoreNotFound := false - if err := removeDeploymentReplicationFinalizers(log, dr.deps.CRCli, p, removalList, ignoreNotFound); err != nil { + if err := removeDeploymentReplicationFinalizers(log, dr.deps.Client.Arango(), p, removalList, ignoreNotFound); err != nil { log.Debug().Err(err).Msg("Failed to update deployment replication (to remove finalizers)") return errors.WithStack(err) } @@ -97,7 +97,7 @@ func (dr *DeploymentReplication) inspectFinalizerDeplReplStopSync(ctx context.Co // Inspect deployment deletion state in source abort := dr.status.CancelFailures > maxCancelFailures - depls := dr.deps.CRCli.DatabaseV1().ArangoDeployments(p.GetNamespace()) + depls := dr.deps.Client.Arango().DatabaseV1().ArangoDeployments(p.GetNamespace()) if name := p.Spec.Source.GetDeploymentName(); name != "" { depl, err := depls.Get(context.Background(), name, metav1.GetOptions{}) if k8sutil.IsNotFound(err) { diff --git a/pkg/replication/server_endpoint_api.go b/pkg/replication/server_endpoint_api.go index d4e6262c9..bf5e7e0f4 100644 --- a/pkg/replication/server_endpoint_api.go +++ b/pkg/replication/server_endpoint_api.go @@ -57,7 +57,7 @@ func (ep serverEndpoint) AuthUserSecretName() string { // TLSCACert returns a PEM encoded TLS CA certificate of the syncmaster at this endpoint func (ep serverEndpoint) TLSCACert() string { tlsCASecretName := ep.getSpec().TLS.GetCASecretName() - secrets := ep.dr.deps.KubeCli.CoreV1().Secrets(ep.dr.apiObject.GetNamespace()) + secrets := ep.dr.deps.Client.Kubernetes().CoreV1().Secrets(ep.dr.apiObject.GetNamespace()) caCert, err := k8sutil.GetCACertficateSecret(context.TODO(), secrets, tlsCASecretName) if err != nil { return "" diff --git a/pkg/replication/sync_client.go b/pkg/replication/sync_client.go index 4d07187cb..70be7cee0 100644 --- a/pkg/replication/sync_client.go +++ b/pkg/replication/sync_client.go @@ -47,7 +47,7 @@ func (dr *DeploymentReplication) createSyncMasterClient(epSpec api.EndpointSpec) } // Authentication - secrets := dr.deps.KubeCli.CoreV1().Secrets(dr.apiObject.GetNamespace()) + secrets := dr.deps.Client.Kubernetes().CoreV1().Secrets(dr.apiObject.GetNamespace()) insecureSkipVerify := true tlsAuth := tasks.TLSAuthentication{} clientAuthKeyfileSecretName, userSecretName, authJWTSecretName, tlsCASecretName, err := dr.getEndpointSecretNames(epSpec) @@ -106,7 +106,7 @@ func (dr *DeploymentReplication) createSyncMasterClient(epSpec api.EndpointSpec) func (dr *DeploymentReplication) createArangoSyncEndpoint(epSpec api.EndpointSpec) (client.Endpoint, error) { if epSpec.HasDeploymentName() { deploymentName := epSpec.GetDeploymentName() - depls := dr.deps.CRCli.DatabaseV1().ArangoDeployments(dr.apiObject.GetNamespace()) + depls := dr.deps.Client.Arango().DatabaseV1().ArangoDeployments(dr.apiObject.GetNamespace()) depl, err := depls.Get(context.Background(), deploymentName, metav1.GetOptions{}) if err != nil { dr.deps.Log.Debug().Err(err).Str("deployment", deploymentName).Msg("Failed to get deployment") @@ -128,7 +128,7 @@ func (dr *DeploymentReplication) createArangoSyncTLSAuthentication(spec api.Depl } // Fetch keyfile - secrets := dr.deps.KubeCli.CoreV1().Secrets(dr.apiObject.GetNamespace()) + secrets := dr.deps.Client.Kubernetes().CoreV1().Secrets(dr.apiObject.GetNamespace()) keyFileContent, err := k8sutil.GetTLSKeyfileSecret(secrets, clientAuthKeyfileSecretName) if err != nil { return client.TLSAuthentication{}, errors.WithStack(err) @@ -165,7 +165,7 @@ func (dr *DeploymentReplication) getEndpointSecretNames(epSpec api.EndpointSpec) userSecretName = epSpec.Authentication.GetUserSecretName() if epSpec.HasDeploymentName() { deploymentName := epSpec.GetDeploymentName() - depls := dr.deps.CRCli.DatabaseV1().ArangoDeployments(dr.apiObject.GetNamespace()) + depls := dr.deps.Client.Arango().DatabaseV1().ArangoDeployments(dr.apiObject.GetNamespace()) depl, err := depls.Get(context.Background(), deploymentName, metav1.GetOptions{}) if err != nil { dr.deps.Log.Debug().Err(err).Str("deployment", deploymentName).Msg("Failed to get deployment") diff --git a/pkg/storage/clients.go b/pkg/storage/clients.go index 59fe223f6..39d0e8512 100644 --- a/pkg/storage/clients.go +++ b/pkg/storage/clients.go @@ -37,7 +37,7 @@ func (ls *LocalStorage) createProvisionerClients() ([]provisioner.API, error) { // Find provisioner endpoints ns := ls.apiObject.GetNamespace() listOptions := k8sutil.LocalStorageListOpt(ls.apiObject.GetName(), roleProvisioner) - items, err := ls.deps.KubeCli.CoreV1().Endpoints(ns).List(context.Background(), listOptions) + items, err := ls.deps.Client.Kubernetes().CoreV1().Endpoints(ns).List(context.Background(), listOptions) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/storage/daemon_set.go b/pkg/storage/daemon_set.go index adb5150e9..fa7765c21 100644 --- a/pkg/storage/daemon_set.go +++ b/pkg/storage/daemon_set.go @@ -127,7 +127,7 @@ func (ls *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error // Attach DS to ArangoLocalStorage ds.SetOwnerReferences(append(ds.GetOwnerReferences(), apiObject.AsOwner())) // Create DS - if _, err := ls.deps.KubeCli.AppsV1().DaemonSets(ns).Create(context.Background(), ds, meta.CreateOptions{}); err != nil { + if _, err := ls.deps.Client.Kubernetes().AppsV1().DaemonSets(ns).Create(context.Background(), ds, meta.CreateOptions{}); err != nil { if k8sutil.IsAlreadyExists(err) { // Already exists, update it } else { @@ -145,14 +145,14 @@ func (ls *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error attempt++ // Load current DS - current, err := ls.deps.KubeCli.AppsV1().DaemonSets(ns).Get(context.Background(), ds.GetName(), meta.GetOptions{}) + current, err := ls.deps.Client.Kubernetes().AppsV1().DaemonSets(ns).Get(context.Background(), ds.GetName(), meta.GetOptions{}) if err != nil { return errors.WithStack(err) } // Update it current.Spec = dsSpec - if _, err := ls.deps.KubeCli.AppsV1().DaemonSets(ns).Update(context.Background(), current, meta.UpdateOptions{}); k8sutil.IsConflict(err) && attempt < 10 { + if _, err := ls.deps.Client.Kubernetes().AppsV1().DaemonSets(ns).Update(context.Background(), current, meta.UpdateOptions{}); k8sutil.IsConflict(err) && attempt < 10 { // Failed to update, try again continue } else if err != nil { diff --git a/pkg/storage/image.go b/pkg/storage/image.go index db5da7650..abd5463ad 100644 --- a/pkg/storage/image.go +++ b/pkg/storage/image.go @@ -33,7 +33,7 @@ func (l *LocalStorage) getMyImage() (string, v1.PullPolicy, error) { log := l.deps.Log ns := l.config.Namespace - p, err := l.deps.KubeCli.CoreV1().Pods(ns).Get(context.Background(), l.config.PodName, metav1.GetOptions{}) + p, err := l.deps.Client.Kubernetes().CoreV1().Pods(ns).Get(context.Background(), l.config.PodName, metav1.GetOptions{}) if err != nil { log.Debug().Err(err).Str("pod-name", l.config.PodName).Msg("Failed to get my own pod") return "", "", errors.WithStack(err) diff --git a/pkg/storage/local_storage.go b/pkg/storage/local_storage.go index 2d1ba3c27..e82621a76 100644 --- a/pkg/storage/local_storage.go +++ b/pkg/storage/local_storage.go @@ -32,12 +32,11 @@ import ( "github.com/rs/zerolog/log" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/record" api "github.com/arangodb/kube-arangodb/pkg/apis/storage/v1alpha" - "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" "github.com/arangodb/kube-arangodb/pkg/util/retry" "github.com/arangodb/kube-arangodb/pkg/util/trigger" ) @@ -52,8 +51,7 @@ type Config struct { // Dependencies holds dependent services for a LocalStorage type Dependencies struct { Log zerolog.Logger - KubeCli kubernetes.Interface - StorageCRCli versioned.Interface + Client kclient.Client EventRecorder record.EventRecorder } @@ -112,7 +110,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoLocalStorage) (* stopCh: make(chan struct{}), } - ls.pvCleaner = newPVCleaner(deps.Log, deps.KubeCli, ls.GetClientByNodeName) + ls.pvCleaner = newPVCleaner(deps.Log, deps.Client.Kubernetes(), ls.GetClientByNodeName) go ls.run() go ls.listenForPvcEvents() @@ -291,7 +289,7 @@ func (ls *LocalStorage) handleArangoLocalStorageUpdatedEvent(event *localStorage log := ls.deps.Log.With().Str("localStorage", event.LocalStorage.GetName()).Logger() // Get the most recent version of the local storage from the API server - current, err := ls.deps.StorageCRCli.StorageV1alpha().ArangoLocalStorages().Get(context.Background(), ls.apiObject.GetName(), metav1.GetOptions{}) + current, err := ls.deps.Client.Arango().StorageV1alpha().ArangoLocalStorages().Get(context.Background(), ls.apiObject.GetName(), metav1.GetOptions{}) if err != nil { log.Debug().Err(err).Msg("Failed to get current version of local storage from API server") if k8sutil.IsNotFound(err) { @@ -353,7 +351,7 @@ func (ls *LocalStorage) updateCRStatus() error { for { attempt++ update.Status = ls.status - newAPIObject, err := ls.deps.StorageCRCli.StorageV1alpha().ArangoLocalStorages().Update(context.Background(), update, metav1.UpdateOptions{}) + newAPIObject, err := ls.deps.Client.Arango().StorageV1alpha().ArangoLocalStorages().Update(context.Background(), update, metav1.UpdateOptions{}) if err == nil { // Update internal object ls.apiObject = newAPIObject @@ -363,7 +361,7 @@ func (ls *LocalStorage) updateCRStatus() error { // API object may have been changed already, // Reload api object and try again var current *api.ArangoLocalStorage - current, err = ls.deps.StorageCRCli.StorageV1alpha().ArangoLocalStorages().Get(context.Background(), update.GetName(), metav1.GetOptions{}) + current, err = ls.deps.Client.Arango().StorageV1alpha().ArangoLocalStorages().Get(context.Background(), update.GetName(), metav1.GetOptions{}) if err == nil { update = current.DeepCopy() continue @@ -387,7 +385,7 @@ func (ls *LocalStorage) updateCRSpec(newSpec api.LocalStorageSpec) error { attempt++ update.Spec = newSpec update.Status = ls.status - newAPIObject, err := ls.deps.StorageCRCli.StorageV1alpha().ArangoLocalStorages().Update(context.Background(), update, metav1.UpdateOptions{}) + newAPIObject, err := ls.deps.Client.Arango().StorageV1alpha().ArangoLocalStorages().Update(context.Background(), update, metav1.UpdateOptions{}) if err == nil { // Update internal object ls.apiObject = newAPIObject @@ -397,7 +395,7 @@ func (ls *LocalStorage) updateCRSpec(newSpec api.LocalStorageSpec) error { // API object may have been changed already, // Reload api object and try again var current *api.ArangoLocalStorage - current, err = ls.deps.StorageCRCli.StorageV1alpha().ArangoLocalStorages().Get(context.Background(), update.GetName(), metav1.GetOptions{}) + current, err = ls.deps.Client.Arango().StorageV1alpha().ArangoLocalStorages().Get(context.Background(), update.GetName(), metav1.GetOptions{}) if err == nil { update = current.DeepCopy() continue @@ -436,7 +434,7 @@ func (ls *LocalStorage) reportFailedStatus() { return errors.WithStack(err) } - depl, err := ls.deps.StorageCRCli.StorageV1alpha().ArangoLocalStorages().Get(context.Background(), ls.apiObject.Name, metav1.GetOptions{}) + depl, err := ls.deps.Client.Arango().StorageV1alpha().ArangoLocalStorages().Get(context.Background(), ls.apiObject.Name, metav1.GetOptions{}) if err != nil { // Update (PUT) will return conflict even if object is deleted since we have UID set in object. // Because it will check UID first and return something like: diff --git a/pkg/storage/pv_creator.go b/pkg/storage/pv_creator.go index f0f4a4ff1..238c7e2c6 100644 --- a/pkg/storage/pv_creator.go +++ b/pkg/storage/pv_creator.go @@ -186,7 +186,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal } // Attach PV to ArangoLocalStorage pv.SetOwnerReferences(append(pv.GetOwnerReferences(), apiObject.AsOwner())) - if _, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().Create(context.Background(), pv, metav1.CreateOptions{}); err != nil { + if _, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().Create(context.Background(), pv, metav1.CreateOptions{}); err != nil { log.Error().Err(err).Msg("Failed to create PersistentVolume") continue } @@ -198,7 +198,7 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal // Bind claim to volume if err := ls.bindClaimToVolume(claim, pv.GetName()); err != nil { // Try to delete the PV now - if err := ls.deps.KubeCli.CoreV1().PersistentVolumes().Delete(context.Background(), pv.GetName(), metav1.DeleteOptions{}); err != nil { + if err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().Delete(context.Background(), pv.GetName(), metav1.DeleteOptions{}); err != nil { log.Error().Err(err).Msg("Failed to delete PV after binding PVC failed") } return errors.WithStack(err) @@ -270,7 +270,7 @@ func getDeploymentInfo(pvc v1.PersistentVolumeClaim) (string, string, bool) { // filterAllowedNodes returns those clients that do not yet have a volume for the given deployment name & role. func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, deploymentName, role string) ([]provisioner.API, error) { // Find all PVs for given deployment & role - list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(context.Background(), metav1.ListOptions{ + list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().List(context.Background(), metav1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s,%s=%s", k8sutil.LabelKeyArangoDeployment, deploymentName, k8sutil.LabelKeyRole, role), }) if err != nil { @@ -294,7 +294,7 @@ func (ls *LocalStorage) filterAllowedNodes(clients map[string]provisioner.API, d // If the claim has been updated, the function retries several times. func (ls *LocalStorage) bindClaimToVolume(claim v1.PersistentVolumeClaim, volumeName string) error { log := ls.deps.Log.With().Str("pvc-name", claim.GetName()).Str("volume-name", volumeName).Logger() - pvcs := ls.deps.KubeCli.CoreV1().PersistentVolumeClaims(claim.GetNamespace()) + pvcs := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumeClaims(claim.GetNamespace()) for attempt := 0; attempt < 10; attempt++ { // Backoff if needed diff --git a/pkg/storage/pv_informer.go b/pkg/storage/pv_informer.go index f92775e88..df3ee6d78 100644 --- a/pkg/storage/pv_informer.go +++ b/pkg/storage/pv_informer.go @@ -44,7 +44,7 @@ func (ls *LocalStorage) listenForPvEvents() { rw := k8sutil.NewResourceWatcher( ls.deps.Log, - ls.deps.KubeCli.CoreV1().RESTClient(), + ls.deps.Client.Kubernetes().CoreV1().RESTClient(), "persistentvolumes", "", //ls.apiObject.GetNamespace(), &v1.PersistentVolume{}, diff --git a/pkg/storage/pv_inspector.go b/pkg/storage/pv_inspector.go index 019715d41..f4018d66b 100644 --- a/pkg/storage/pv_inspector.go +++ b/pkg/storage/pv_inspector.go @@ -35,7 +35,7 @@ import ( // Returns the number of available PV's. func (ls *LocalStorage) inspectPVs() (int, error) { log := ls.deps.Log - list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(context.Background(), metav1.ListOptions{}) + list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().List(context.Background(), metav1.ListOptions{}) if err != nil { return 0, errors.WithStack(err) } diff --git a/pkg/storage/pvc_informer.go b/pkg/storage/pvc_informer.go index 19782f8be..1daedcabf 100644 --- a/pkg/storage/pvc_informer.go +++ b/pkg/storage/pvc_informer.go @@ -44,7 +44,7 @@ func (ls *LocalStorage) listenForPvcEvents() { rw := k8sutil.NewResourceWatcher( ls.deps.Log, - ls.deps.KubeCli.CoreV1().RESTClient(), + ls.deps.Client.Kubernetes().CoreV1().RESTClient(), "persistentvolumeclaims", "", //ls.apiObject.GetNamespace(), &v1.PersistentVolumeClaim{}, diff --git a/pkg/storage/pvc_inspector.go b/pkg/storage/pvc_inspector.go index 8f1782d1c..afab9c80b 100644 --- a/pkg/storage/pvc_inspector.go +++ b/pkg/storage/pvc_inspector.go @@ -33,7 +33,7 @@ import ( // Returns the PVC's that need a volume. func (ls *LocalStorage) inspectPVCs() ([]v1.PersistentVolumeClaim, error) { ns := ls.apiObject.GetNamespace() - list, err := ls.deps.KubeCli.CoreV1().PersistentVolumeClaims(ns).List(context.Background(), metav1.ListOptions{}) + list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumeClaims(ns).List(context.Background(), metav1.ListOptions{}) if err != nil { return nil, errors.WithStack(err) } diff --git a/pkg/storage/server_api.go b/pkg/storage/server_api.go index ad48b6b4a..6a7cb5396 100644 --- a/pkg/storage/server_api.go +++ b/pkg/storage/server_api.go @@ -63,7 +63,7 @@ func (ls *LocalStorage) StorageClassIsDefault() bool { // Volumes returns all volumes created by the local storage resource func (ls *LocalStorage) Volumes() []server.Volume { - list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(context.Background(), metav1.ListOptions{}) + list, err := ls.deps.Client.Kubernetes().CoreV1().PersistentVolumes().List(context.Background(), metav1.ListOptions{}) if err != nil { ls.deps.Log.Error().Err(err).Msg("Failed to list persistent volumes") return nil diff --git a/pkg/storage/service.go b/pkg/storage/service.go index 4827c74a3..77994fb25 100644 --- a/pkg/storage/service.go +++ b/pkg/storage/service.go @@ -54,7 +54,7 @@ func (ls *LocalStorage) ensureProvisionerService(apiObject *api.ArangoLocalStora } svc.SetOwnerReferences(append(svc.GetOwnerReferences(), apiObject.AsOwner())) ns := ls.config.Namespace - if _, err := ls.deps.KubeCli.CoreV1().Services(ns).Create(context.Background(), svc, metav1.CreateOptions{}); err != nil && !k8sutil.IsAlreadyExists(err) { + if _, err := ls.deps.Client.Kubernetes().CoreV1().Services(ns).Create(context.Background(), svc, metav1.CreateOptions{}); err != nil && !k8sutil.IsAlreadyExists(err) { return errors.WithStack(err) } return nil diff --git a/pkg/storage/storage_class.go b/pkg/storage/storage_class.go index 381ffd371..4fe48b488 100644 --- a/pkg/storage/storage_class.go +++ b/pkg/storage/storage_class.go @@ -53,7 +53,7 @@ func (l *LocalStorage) ensureStorageClass(apiObject *api.ArangoLocalStorage) err } // Note: We do not attach the StorageClass to the apiObject (OwnerRef) because many // ArangoLocalStorage resource may use the same StorageClass. - cli := l.deps.KubeCli.StorageV1() + cli := l.deps.Client.Kubernetes().StorageV1() if _, err := cli.StorageClasses().Create(context.Background(), sc, metav1.CreateOptions{}); k8sutil.IsAlreadyExists(err) { log.Debug(). Str("storageclass", sc.GetName()). diff --git a/pkg/util/k8sutil/client.go b/pkg/util/k8sutil/client.go deleted file mode 100644 index 52ddcdca6..000000000 --- a/pkg/util/k8sutil/client.go +++ /dev/null @@ -1,109 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package k8sutil - -import ( - "fmt" - "os" - - "github.com/arangodb/kube-arangodb/pkg/util/errors" - - monitoringClient "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned/typed/monitoring/v1" - - "github.com/arangodb/kube-arangodb/pkg/util" - "k8s.io/client-go/tools/clientcmd" - - apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" -) - -const Kubeconfig util.EnvironmentVariable = "KUBECONFIG" - -// NewKubeConfig loads config from KUBECONFIG or as incluster -func NewKubeConfig() (*rest.Config, error) { - // If KUBECONFIG is defined use this variable - if kubeconfig, ok := Kubeconfig.Lookup(); ok { - return clientcmd.BuildConfigFromFlags("", kubeconfig) - } - - // Try to load incluster config - if cfg, err := rest.InClusterConfig(); err == nil { - return cfg, nil - } else if err != rest.ErrNotInCluster { - return nil, err - } - - // At the end try to use default path - home, err := os.UserHomeDir() - if err != nil { - return nil, err - } - - return clientcmd.BuildConfigFromFlags("", fmt.Sprintf("%s/.kube/config", home)) -} - -// NewKubeClient creates a new k8s client -func NewKubeClient() (kubernetes.Interface, error) { - cfg, err := NewKubeConfig() - if err != nil { - return nil, errors.WithStack(err) - } - c, err := kubernetes.NewForConfig(cfg) - if err != nil { - return nil, errors.WithStack(err) - } - return c, nil -} - -// MustNewKubeClient calls NewKubeClient an panics if it fails -func MustNewKubeClient() kubernetes.Interface { - i, err := NewKubeClient() - if err != nil { - panic(err) - } - return i -} - -// NewKubeExtClient creates a new k8s api extensions client -func NewKubeExtClient() (apiextensionsclient.Interface, error) { - cfg, err := NewKubeConfig() - if err != nil { - return nil, errors.WithStack(err) - } - c, err := apiextensionsclient.NewForConfig(cfg) - if err != nil { - return nil, errors.WithStack(err) - } - return c, nil -} - -func NewKubeMonitoringV1Client() (monitoringClient.MonitoringV1Interface, error) { - cfg, err := NewKubeConfig() - if err != nil { - return nil, errors.WithStack(err) - } - c, err := monitoringClient.NewForConfig(cfg) - if err != nil { - return nil, errors.WithStack(err) - } - return c, nil -} diff --git a/pkg/util/k8sutil/mocks/core.go b/pkg/util/k8sutil/mocks/core.go deleted file mode 100644 index 24e8506b0..000000000 --- a/pkg/util/k8sutil/mocks/core.go +++ /dev/null @@ -1,113 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package mocks - -import ( - "k8s.io/apimachinery/pkg/watch" - v1 "k8s.io/client-go/kubernetes/typed/core/v1" -) - -type coreV1 struct { - restClient - secrets map[string]v1.SecretInterface -} - -func NewCore() v1.CoreV1Interface { - return &coreV1{ - secrets: make(map[string]v1.SecretInterface), - } -} - -func nilOrWatch(x interface{}) watch.Interface { - if s, ok := x.(watch.Interface); ok { - return s - } - return nil -} - -func (c *coreV1) ComponentStatuses() v1.ComponentStatusInterface { - panic("not support") -} - -func (c *coreV1) ConfigMaps(namespace string) v1.ConfigMapInterface { - panic("not support") -} - -func (c *coreV1) Endpoints(namespace string) v1.EndpointsInterface { - panic("not support") -} - -func (c *coreV1) Events(namespace string) v1.EventInterface { - panic("not support") -} - -func (c *coreV1) LimitRanges(namespace string) v1.LimitRangeInterface { - panic("not support") -} - -func (c *coreV1) Namespaces() v1.NamespaceInterface { - panic("not support") -} - -func (c *coreV1) Nodes() v1.NodeInterface { - panic("not support") -} - -func (c *coreV1) PersistentVolumes() v1.PersistentVolumeInterface { - panic("not support") -} - -func (c *coreV1) PersistentVolumeClaims(namespace string) v1.PersistentVolumeClaimInterface { - panic("not support") -} - -func (c *coreV1) Pods(namespace string) v1.PodInterface { - panic("not support") -} - -func (c *coreV1) PodTemplates(namespace string) v1.PodTemplateInterface { - panic("not support") -} - -func (c *coreV1) ReplicationControllers(namespace string) v1.ReplicationControllerInterface { - panic("not support") -} - -func (c *coreV1) ResourceQuotas(namespace string) v1.ResourceQuotaInterface { - panic("not support") -} - -func (c *coreV1) Secrets(namespace string) v1.SecretInterface { - if x, found := c.secrets[namespace]; found { - return x - } - x := NewSecrets() - c.secrets[namespace] = x - return x -} - -func (c *coreV1) Services(namespace string) v1.ServiceInterface { - panic("not support") -} - -func (c *coreV1) ServiceAccounts(namespace string) v1.ServiceAccountInterface { - panic("not support") -} diff --git a/pkg/util/k8sutil/mocks/mock.go b/pkg/util/k8sutil/mocks/mock.go deleted file mode 100644 index efb317ff0..000000000 --- a/pkg/util/k8sutil/mocks/mock.go +++ /dev/null @@ -1,32 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package mocks - -import "github.com/stretchr/testify/mock" - -type MockGetter interface { - AsMock() *mock.Mock -} - -// AsMock performs a typeconversion to *Mock. -func AsMock(obj interface{}) *mock.Mock { - return obj.(MockGetter).AsMock() -} diff --git a/pkg/util/k8sutil/mocks/secrets.go b/pkg/util/k8sutil/mocks/secrets.go deleted file mode 100644 index 4bb54a9b6..000000000 --- a/pkg/util/k8sutil/mocks/secrets.go +++ /dev/null @@ -1,104 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package mocks - -import ( - "context" - - "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/watch" - corev1 "k8s.io/client-go/kubernetes/typed/core/v1" -) - -type SecretInterface interface { - corev1.SecretInterface - MockGetter -} - -type secrets struct { - mock.Mock -} - -func NewSecrets() SecretInterface { - return &secrets{} -} - -func nilOrSecret(x interface{}) *v1.Secret { - if s, ok := x.(*v1.Secret); ok { - return s - } - return nil -} - -func nilOrSecretList(x interface{}) *v1.SecretList { - if s, ok := x.(*v1.SecretList); ok { - return s - } - return nil -} - -func (s *secrets) AsMock() *mock.Mock { - return &s.Mock -} - -func (s *secrets) Create(_ context.Context, x *v1.Secret, _ meta_v1.CreateOptions) (*v1.Secret, error) { - args := s.Called(x) - return nilOrSecret(args.Get(0)), args.Error(1) -} - -func (s *secrets) Update(_ context.Context, x *v1.Secret, _ meta_v1.UpdateOptions) (*v1.Secret, error) { - args := s.Called(x) - return nilOrSecret(args.Get(0)), args.Error(1) -} - -func (s *secrets) Delete(_ context.Context, name string, options meta_v1.DeleteOptions) error { - args := s.Called(name, options) - return args.Error(0) -} - -func (s *secrets) DeleteCollection(_ context.Context, options meta_v1.DeleteOptions, listOptions meta_v1.ListOptions) error { - args := s.Called(options, listOptions) - return args.Error(0) -} - -func (s *secrets) Get(_ context.Context, name string, options meta_v1.GetOptions) (*v1.Secret, error) { - args := s.Called(name, options) - return nilOrSecret(args.Get(0)), args.Error(1) -} - -func (s *secrets) List(_ context.Context, opts meta_v1.ListOptions) (*v1.SecretList, error) { - args := s.Called(opts) - return nilOrSecretList(args.Get(0)), args.Error(1) -} - -func (s *secrets) Watch(_ context.Context, opts meta_v1.ListOptions) (watch.Interface, error) { - args := s.Called(opts) - return nilOrWatch(args.Get(0)), args.Error(1) -} - -func (s *secrets) Patch(_ context.Context, name string, pt types.PatchType, data []byte, - options meta_v1.PatchOptions, subresources ...string) (result *v1.Secret, err error) { - args := s.Called(name, pt, data, subresources) - return nilOrSecret(args.Get(0)), args.Error(1) -} diff --git a/pkg/util/kclient/client.go b/pkg/util/kclient/client.go new file mode 100644 index 000000000..b9ca1aebe --- /dev/null +++ b/pkg/util/kclient/client.go @@ -0,0 +1,56 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kclient + +import ( + "fmt" + "os" + + "github.com/arangodb/kube-arangodb/pkg/util" + "k8s.io/client-go/tools/clientcmd" + + "k8s.io/client-go/rest" +) + +const Kubeconfig util.EnvironmentVariable = "KUBECONFIG" + +// newKubeConfig loads config from KUBECONFIG or as incluster +func newKubeConfig() (*rest.Config, error) { + // If KUBECONFIG is defined use this variable + if kubeconfig, ok := Kubeconfig.Lookup(); ok { + return clientcmd.BuildConfigFromFlags("", kubeconfig) + } + + // Try to load incluster config + if cfg, err := rest.InClusterConfig(); err == nil { + return cfg, nil + } else if err != rest.ErrNotInCluster { + return nil, err + } + + // At the end try to use default path + home, err := os.UserHomeDir() + if err != nil { + return nil, err + } + + return clientcmd.BuildConfigFromFlags("", fmt.Sprintf("%s/.kube/config", home)) +} diff --git a/pkg/util/kclient/client_factory.go b/pkg/util/kclient/client_factory.go new file mode 100644 index 000000000..e3a810087 --- /dev/null +++ b/pkg/util/kclient/client_factory.go @@ -0,0 +1,229 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kclient + +import ( + "sync" + + "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" + "github.com/dchest/uniuri" + "github.com/pkg/errors" + monitoring "github.com/prometheus-operator/prometheus-operator/pkg/client/versioned" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +var ( + factories = map[string]*factory{} + factoriesLock sync.Mutex +) + +func init() { + f := GetDefaultFactory() + + f.SetKubeConfigGetter(NewStaticConfigGetter(newKubeConfig)) + + if err := f.Refresh(); err != nil { + println("Error while getting client: ", err.Error()) + } +} + +func GetDefaultFactory() Factory { + return GetFactory("") +} + +func GetFactory(name string) Factory { + factoriesLock.Lock() + defer factoriesLock.Unlock() + + if f, ok := factories[name]; ok { + return f + } + + factories[name] = &factory{ + name: name, + } + + return factories[name] +} + +type ConfigGetter func() (*rest.Config, string, error) + +func NewStaticConfigGetter(f func() (*rest.Config, error)) ConfigGetter { + u := uniuri.NewLen(32) + return func() (*rest.Config, string, error) { + if f == nil { + return nil, "", errors.Errorf("Provided generator is empty") + } + cfg, err := f() + return cfg, u, err + } +} + +type Factory interface { + SetKubeConfigGetter(getter ConfigGetter) + Refresh() error + SetClient(c Client) + + Client() (Client, bool) +} + +type factory struct { + lock sync.RWMutex + + name string + + getter ConfigGetter + + kubeConfigChecksum string + + client Client +} + +func (f *factory) Refresh() error { + return f.refresh() +} + +func (f *factory) SetClient(c Client) { + f.lock.Lock() + defer f.lock.Unlock() + + f.client = c +} + +func (f *factory) SetKubeConfigGetter(getter ConfigGetter) { + f.lock.Lock() + defer f.lock.Unlock() + + f.getter = getter + f.client = nil +} + +func (f *factory) refresh() error { + if f.getter == nil { + return errors.Errorf("Getter is nil") + } + + cfg, checksum, err := f.getter() + if err != nil { + return err + } + + f.lock.Lock() + defer f.lock.Unlock() + + if f.client != nil && checksum == f.kubeConfigChecksum { + return nil + } + + cfg.RateLimiter = GetRateLimiter(f.name) + + client, err := newClient(cfg) + if err != nil { + return err + } + + f.client = client + f.kubeConfigChecksum = checksum + + return nil +} + +func (f *factory) Client() (Client, bool) { + f.lock.RLock() + defer f.lock.RUnlock() + + if f.client == nil { + return nil, false + } + + return f.client, true +} + +type Client interface { + Kubernetes() kubernetes.Interface + KubernetesExtensions() apiextensionsclient.Interface + Arango() versioned.Interface + Monitoring() monitoring.Interface +} + +func NewStaticClient(kubernetes kubernetes.Interface, kubernetesExtensions apiextensionsclient.Interface, arango versioned.Interface, monitoring monitoring.Interface) Client { + return &client{ + kubernetes: kubernetes, + kubernetesExtensions: kubernetesExtensions, + arango: arango, + monitoring: monitoring, + } +} + +func newClient(cfg *rest.Config) (*client, error) { + var c client + + if q, err := kubernetes.NewForConfig(cfg); err != nil { + return nil, err + } else { + c.kubernetes = q + } + + if q, err := apiextensionsclient.NewForConfig(cfg); err != nil { + return nil, err + } else { + c.kubernetesExtensions = q + } + + if q, err := versioned.NewForConfig(cfg); err != nil { + return nil, err + } else { + c.arango = q + } + + if q, err := monitoring.NewForConfig(cfg); err != nil { + return nil, err + } else { + c.monitoring = q + } + + return &c, nil +} + +type client struct { + kubernetes kubernetes.Interface + kubernetesExtensions apiextensionsclient.Interface + arango versioned.Interface + monitoring monitoring.Interface +} + +func (c *client) Kubernetes() kubernetes.Interface { + return c.kubernetes +} + +func (c *client) KubernetesExtensions() apiextensionsclient.Interface { + return c.kubernetesExtensions +} + +func (c *client) Arango() versioned.Interface { + return c.arango +} + +func (c *client) Monitoring() monitoring.Interface { + return c.monitoring +} diff --git a/pkg/util/kclient/mod.go b/pkg/util/kclient/mod.go new file mode 100644 index 000000000..c353d0f17 --- /dev/null +++ b/pkg/util/kclient/mod.go @@ -0,0 +1,87 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kclient + +import ( + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/arangomember" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/persistentvolumeclaim" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/poddisruptionbudget" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/secret" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/serviceaccount" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/servicemonitor" +) + +func NewModInterface(client Client, namespace string) ModInterface { + return modInterface{ + client: client, + namespace: namespace, + } +} + +type ModInterface interface { + Secrets() secret.ModInterface + Pods() pod.ModInterface + Services() service.ModInterface + ServiceAccounts() serviceaccount.ModInterface + PersistentVolumeClaims() persistentvolumeclaim.ModInterface + PodDisruptionBudgets() poddisruptionbudget.ModInterface + ServiceMonitors() servicemonitor.ModInterface + ArangoMembers() arangomember.ModInterface +} + +type modInterface struct { + client Client + namespace string +} + +func (m modInterface) PersistentVolumeClaims() persistentvolumeclaim.ModInterface { + return m.client.Kubernetes().CoreV1().PersistentVolumeClaims(m.namespace) +} + +func (m modInterface) PodDisruptionBudgets() poddisruptionbudget.ModInterface { + return m.client.Kubernetes().PolicyV1beta1().PodDisruptionBudgets(m.namespace) +} + +func (m modInterface) ServiceMonitors() servicemonitor.ModInterface { + return m.client.Monitoring().MonitoringV1().ServiceMonitors(m.namespace) +} + +func (m modInterface) ArangoMembers() arangomember.ModInterface { + return m.client.Arango().DatabaseV1().ArangoMembers(m.namespace) +} + +func (m modInterface) Services() service.ModInterface { + return m.client.Kubernetes().CoreV1().Services(m.namespace) +} + +func (m modInterface) ServiceAccounts() serviceaccount.ModInterface { + return m.client.Kubernetes().CoreV1().ServiceAccounts(m.namespace) +} + +func (m modInterface) Pods() pod.ModInterface { + return m.client.Kubernetes().CoreV1().Pods(m.namespace) +} + +func (m modInterface) Secrets() secret.ModInterface { + return m.client.Kubernetes().CoreV1().Secrets(m.namespace) +} diff --git a/pkg/util/kclient/ratelimiter.go b/pkg/util/kclient/ratelimiter.go new file mode 100644 index 000000000..16baeb99e --- /dev/null +++ b/pkg/util/kclient/ratelimiter.go @@ -0,0 +1,87 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kclient + +import ( + "sync" + + "golang.org/x/time/rate" + "k8s.io/client-go/rest" + "k8s.io/client-go/util/flowcontrol" +) + +const ( + DefaultQPS = rest.DefaultQPS * 3 + DefaultBurst = rest.DefaultBurst * 3 +) + +var ( + rateLimiters = map[string]*rateLimiter{} + rateLimitersLock sync.Mutex + + defaultQPS = DefaultQPS + defaultBurst = DefaultBurst +) + +func GetDefaultRateLimiter() flowcontrol.RateLimiter { + return GetRateLimiter("") +} + +func GetRateLimiter(name string) flowcontrol.RateLimiter { + rateLimitersLock.Lock() + defer rateLimitersLock.Unlock() + + if v, ok := rateLimiters[name]; ok { + return v + } + + l := &rateLimiter{ + limiter: rate.NewLimiter(rate.Limit(defaultQPS), defaultBurst), + clock: clock{}, + qps: defaultQPS, + } + + rateLimiters[name] = l + + return l +} + +func SetDefaultBurst(q int) { + rateLimitersLock.Lock() + defer rateLimitersLock.Unlock() + + defaultBurst = q + + for _, v := range rateLimiters { + v.setBurst(q) + } +} + +func SetDefaultQPS(q float32) { + rateLimitersLock.Lock() + defer rateLimitersLock.Unlock() + + defaultQPS = q + + for _, v := range rateLimiters { + v.setQPS(q) + } +} diff --git a/pkg/util/k8sutil/mocks/rest_client.go b/pkg/util/kclient/ratelimiter_clock.go similarity index 79% rename from pkg/util/k8sutil/mocks/rest_client.go rename to pkg/util/kclient/ratelimiter_clock.go index f1114cdf3..6f5834413 100644 --- a/pkg/util/k8sutil/mocks/rest_client.go +++ b/pkg/util/kclient/ratelimiter_clock.go @@ -18,13 +18,19 @@ // Copyright holder is ArangoDB GmbH, Cologne, Germany // -package mocks +package kclient -import "k8s.io/client-go/rest" +import ( + "time" +) -type restClient struct { +type clock struct { } -func (c *restClient) RESTClient() rest.Interface { - panic("not support") +func (c clock) Now() time.Time { + return time.Now() +} + +func (c clock) Sleep(duration time.Duration) { + time.Sleep(duration) } diff --git a/pkg/util/kclient/ratelimiter_impl.go b/pkg/util/kclient/ratelimiter_impl.go new file mode 100644 index 000000000..e5ae6dcbe --- /dev/null +++ b/pkg/util/kclient/ratelimiter_impl.go @@ -0,0 +1,86 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package kclient + +import ( + "context" + "sync" + + "golang.org/x/time/rate" + "k8s.io/client-go/util/flowcontrol" +) + +var _ flowcontrol.RateLimiter = &rateLimiter{} + +type rateLimiter struct { + lock sync.Mutex + + limiter *rate.Limiter + clock clock + qps float32 +} + +func (r *rateLimiter) setBurst(d int) { + r.lock.Lock() + defer r.lock.Unlock() + + r.limiter.SetBurst(d) +} + +func (r *rateLimiter) setQPS(d float32) { + r.lock.Lock() + defer r.lock.Unlock() + + r.qps = d + r.limiter.SetLimit(rate.Limit(d)) +} + +func (r *rateLimiter) Accept() { + r.lock.Lock() + defer r.lock.Unlock() + + now := r.clock.Now() + r.clock.Sleep(r.limiter.ReserveN(now, 1).DelayFrom(now)) +} + +func (r *rateLimiter) Stop() { +} + +func (r *rateLimiter) QPS() float32 { + r.lock.Lock() + defer r.lock.Unlock() + + return r.qps +} + +func (r *rateLimiter) Wait(ctx context.Context) error { + r.lock.Lock() + defer r.lock.Unlock() + + return r.limiter.Wait(ctx) +} + +func (r *rateLimiter) TryAccept() bool { + r.lock.Lock() + defer r.lock.Unlock() + + return r.limiter.AllowN(r.clock.Now(), 1) +}