1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] [ML] Deployment Handler (#1500)

This commit is contained in:
Adam Janikowski 2023-11-23 10:58:34 +01:00 committed by GitHub
parent 5a7d305fca
commit 8cdc6b94e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 804 additions and 48 deletions

View file

@ -26,5 +26,13 @@ rules:
- "arangomlstorages/status"
verbs:
- "*"
- apiGroups:
- "database.arangodb.com"
resources:
- "arangodeployments"
verbs:
- "get"
- "list"
- "watch"
{{- end }}
{{- end }}

View file

@ -160,6 +160,10 @@ var (
backupArangoD time.Duration
backupUploadArangoD time.Duration
}
operatorReconciliationRetry struct {
delay time.Duration
count int
}
chaosOptions struct {
allowed bool
}
@ -222,6 +226,8 @@ func init() {
f.DurationVar(&operatorTimeouts.backupUploadArangoD, "timeout.backup-upload", globals.BackupUploadArangoClientTimeout, "The request timeout to the ArangoDB during uploading files")
f.DurationVar(&shutdownOptions.delay, "shutdown.delay", defaultShutdownDelay, "The delay before running shutdown handlers")
f.DurationVar(&shutdownOptions.timeout, "shutdown.timeout", defaultShutdownTimeout, "Timeout for shutdown handlers")
f.DurationVar(&operatorReconciliationRetry.delay, "operator.reconciliation.retry.delay", globals.DefaultOperatorUpdateRetryDelay, "Delay between Object Update operations in the Reconciliation loop")
f.IntVar(&operatorReconciliationRetry.count, "operator.reconciliation.retry.count", globals.DefaultOperatorUpdateRetryCount, "Count of retries during Object Update operations in the Reconciliation loop")
f.BoolVar(&operatorOptions.scalingIntegrationEnabled, "internal.scaling-integration", false, "Enable Scaling Integration")
f.DurationVar(&operatorOptions.reconciliationDelay, "reconciliation.delay", 0, "Delay between reconciliation loops (<= 0 -> Disabled)")
f.Int64Var(&operatorKubernetesOptions.maxBatchSize, "kubernetes.max-batch-size", globals.DefaultKubernetesRequestBatchSize, "Size of batch during objects read")
@ -281,6 +287,9 @@ func executeMain(cmd *cobra.Command, args []string) {
globals.GetGlobalTimeouts().BackupArangoClientTimeout().Set(operatorTimeouts.backupArangoD)
globals.GetGlobalTimeouts().BackupArangoClientUploadTimeout().Set(operatorTimeouts.backupUploadArangoD)
globals.GetGlobals().Retry().OperatorUpdateRetryDelay().Set(operatorReconciliationRetry.delay)
globals.GetGlobals().Retry().OperatorUpdateRetryCount().Set(operatorReconciliationRetry.count)
globals.GetGlobals().Kubernetes().RequestBatchSize().Set(operatorKubernetesOptions.maxBatchSize)
globals.GetGlobals().Backup().ConcurrentUploads().Set(operatorBackup.concurrentUploads)

View file

@ -4,3 +4,9 @@
## Status
### .status.conditions
Type: `api.Conditions` <sup>[\[ref\]](https://github.com/arangodb/kube-arangodb/blob/1.2.35/pkg/apis/ml/v1alpha1/extension_status.go#L28)</sup>
Conditions specific to the entire extension

View file

@ -61,3 +61,11 @@ type ArangoBackup struct {
Spec ArangoBackupSpec `json:"spec"`
Status ArangoBackupStatus `json:"status"`
}
func (a *ArangoBackup) GetStatus() ArangoBackupStatus {
return a.Status
}
func (a *ArangoBackup) SetStatus(status ArangoBackupStatus) {
a.Status = status
}

View file

@ -45,3 +45,11 @@ type ArangoMLExtension struct {
Spec ArangoMLExtensionSpec `json:"spec"`
Status ArangoMLExtensionStatus `json:"status"`
}
func (a *ArangoMLExtension) GetStatus() ArangoMLExtensionStatus {
return a.Status
}
func (a *ArangoMLExtension) SetStatus(status ArangoMLExtensionStatus) {
a.Status = status
}

View file

@ -0,0 +1,27 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1alpha1
import api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
const (
ExtensionDeploymentFoundCondition api.ConditionType = "DeploymentFound"
)

View file

@ -20,5 +20,10 @@
package v1alpha1
import api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
type ArangoMLExtensionStatus struct {
// Conditions specific to the entire extension
// +doc/type: api.Conditions
Conditions api.ConditionList `json:"conditions,omitempty"`
}

View file

@ -26,6 +26,7 @@
package v1alpha1
import (
v1 "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
)
@ -221,7 +222,7 @@ func (in *ArangoMLExtension) DeepCopyInto(out *ArangoMLExtension) {
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
out.Status = in.Status
in.Status.DeepCopyInto(&out.Status)
return
}
@ -295,6 +296,13 @@ func (in *ArangoMLExtensionSpec) DeepCopy() *ArangoMLExtensionSpec {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ArangoMLExtensionStatus) DeepCopyInto(out *ArangoMLExtensionStatus) {
*out = *in
if in.Conditions != nil {
in, out := &in.Conditions, &out.Conditions
*out = make(v1.ConditionList, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}

View file

@ -42,6 +42,7 @@ var rootFactories = []shared.Factory{
kubernetes.Services(),
kubernetes.Deployments(),
kubernetes.AgencyDump(),
kubernetes.ML(),
}
func InitCommand(cmd *cobra.Command) {

View file

@ -64,13 +64,7 @@ func deployments(logger zerolog.Logger, files chan<- shared.File) error {
return err
}
errDeployments := make([]error, len(deploymentList))
for id := range deploymentList {
errDeployments[id] = deployment(k, deploymentList[id], files)
}
if err := errors.Errors(errDeployments...); err != nil {
if err := errors.ExecuteWithErrorArrayP2(deployment, k, files, deploymentList...); err != nil {
logger.Err(err).Msgf("Error while collecting arango deployments")
return err
}
@ -78,7 +72,7 @@ func deployments(logger zerolog.Logger, files chan<- shared.File) error {
return nil
}
func deployment(client kclient.Client, depl *api.ArangoDeployment, files chan<- shared.File) error {
func deployment(client kclient.Client, files chan<- shared.File, depl *api.ArangoDeployment) error {
files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/deployments/%s.yaml", depl.GetName()), func() ([]interface{}, error) {
return []interface{}{depl}, nil
})

View file

@ -0,0 +1,62 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package kubernetes
import (
"github.com/rs/zerolog"
"github.com/arangodb/kube-arangodb/pkg/debug_package/shared"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
func ML() shared.Factory {
return shared.NewFactory("ml", true, ml)
}
func ml(logger zerolog.Logger, files chan<- shared.File) error {
k, ok := kclient.GetDefaultFactory().Client()
if !ok {
return errors.Newf("Client is not initialised")
}
if err := mlExtensions(logger, files, k); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml extension")
return err
}
if err := mlStorages(logger, files, k); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml storage")
return err
}
if err := mlBatchJobs(logger, files, k); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml batch jobs")
return err
}
if err := mlCronJobs(logger, files, k); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml cron jobs")
return err
}
return nil
}

View file

@ -0,0 +1,73 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package kubernetes
import (
"context"
"fmt"
"github.com/rs/zerolog"
mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/debug_package/cli"
"github.com/arangodb/kube-arangodb/pkg/debug_package/shared"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
func mlBatchJobs(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error {
batchjobs, err := listMLBatchJobs(client)
if err != nil {
if kerrors.IsForbiddenOrNotFound(err) {
return nil
}
return err
}
if err := errors.ExecuteWithErrorArrayP2(mlBatchJob, client, files, batchjobs...); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml batchjobs")
return err
}
return nil
}
func mlBatchJob(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLBatchJob) error {
files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/batchjobs/%s.yaml", ext.GetName()), func() ([]interface{}, error) {
return []interface{}{ext}, nil
})
return nil
}
func listMLBatchJobs(client kclient.Client) ([]*mlApi.ArangoMLBatchJob, error) {
return ListObjects[*mlApi.ArangoMLBatchJobList, *mlApi.ArangoMLBatchJob](context.Background(), client.Arango().MlV1alpha1().ArangoMLBatchJobs(cli.GetInput().Namespace), func(result *mlApi.ArangoMLBatchJobList) []*mlApi.ArangoMLBatchJob {
q := make([]*mlApi.ArangoMLBatchJob, len(result.Items))
for id, e := range result.Items {
q[id] = e.DeepCopy()
}
return q
})
}

View file

@ -0,0 +1,73 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package kubernetes
import (
"context"
"fmt"
"github.com/rs/zerolog"
mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/debug_package/cli"
"github.com/arangodb/kube-arangodb/pkg/debug_package/shared"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
func mlCronJobs(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error {
cronjobs, err := listMLCronJobs(client)
if err != nil {
if kerrors.IsForbiddenOrNotFound(err) {
return nil
}
return err
}
if err := errors.ExecuteWithErrorArrayP2(mlCronJob, client, files, cronjobs...); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml cronjobs")
return err
}
return nil
}
func mlCronJob(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLCronJob) error {
files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/cronjobs/%s.yaml", ext.GetName()), func() ([]interface{}, error) {
return []interface{}{ext}, nil
})
return nil
}
func listMLCronJobs(client kclient.Client) ([]*mlApi.ArangoMLCronJob, error) {
return ListObjects[*mlApi.ArangoMLCronJobList, *mlApi.ArangoMLCronJob](context.Background(), client.Arango().MlV1alpha1().ArangoMLCronJobs(cli.GetInput().Namespace), func(result *mlApi.ArangoMLCronJobList) []*mlApi.ArangoMLCronJob {
q := make([]*mlApi.ArangoMLCronJob, len(result.Items))
for id, e := range result.Items {
q[id] = e.DeepCopy()
}
return q
})
}

View file

@ -0,0 +1,73 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package kubernetes
import (
"context"
"fmt"
"github.com/rs/zerolog"
mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/debug_package/cli"
"github.com/arangodb/kube-arangodb/pkg/debug_package/shared"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
func mlExtensions(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error {
extensions, err := listMLExtensions(client)
if err != nil {
if kerrors.IsForbiddenOrNotFound(err) {
return nil
}
return err
}
if err := errors.ExecuteWithErrorArrayP2(mlExtension, client, files, extensions...); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml extensions")
return err
}
return nil
}
func mlExtension(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLExtension) error {
files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/extensions/%s.yaml", ext.GetName()), func() ([]interface{}, error) {
return []interface{}{ext}, nil
})
return nil
}
func listMLExtensions(client kclient.Client) ([]*mlApi.ArangoMLExtension, error) {
return ListObjects[*mlApi.ArangoMLExtensionList, *mlApi.ArangoMLExtension](context.Background(), client.Arango().MlV1alpha1().ArangoMLExtensions(cli.GetInput().Namespace), func(result *mlApi.ArangoMLExtensionList) []*mlApi.ArangoMLExtension {
q := make([]*mlApi.ArangoMLExtension, len(result.Items))
for id, e := range result.Items {
q[id] = e.DeepCopy()
}
return q
})
}

View file

@ -0,0 +1,73 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package kubernetes
import (
"context"
"fmt"
"github.com/rs/zerolog"
mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/debug_package/cli"
"github.com/arangodb/kube-arangodb/pkg/debug_package/shared"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
func mlStorages(logger zerolog.Logger, files chan<- shared.File, client kclient.Client) error {
storages, err := listMLStorages(client)
if err != nil {
if kerrors.IsForbiddenOrNotFound(err) {
return nil
}
return err
}
if err := errors.ExecuteWithErrorArrayP2(mlStorage, client, files, storages...); err != nil {
logger.Err(err).Msgf("Error while collecting arango ml storages")
return err
}
return nil
}
func mlStorage(client kclient.Client, files chan<- shared.File, ext *mlApi.ArangoMLStorage) error {
files <- shared.NewYAMLFile(fmt.Sprintf("kubernetes/arango/ml/storages/%s.yaml", ext.GetName()), func() ([]interface{}, error) {
return []interface{}{ext}, nil
})
return nil
}
func listMLStorages(client kclient.Client) ([]*mlApi.ArangoMLStorage, error) {
return ListObjects[*mlApi.ArangoMLStorageList, *mlApi.ArangoMLStorage](context.Background(), client.Arango().MlV1alpha1().ArangoMLStorages(cli.GetInput().Namespace), func(result *mlApi.ArangoMLStorageList) []*mlApi.ArangoMLStorage {
q := make([]*mlApi.ArangoMLStorage, len(result.Items))
for id, e := range result.Items {
q[id] = e.DeepCopy()
}
return q
})
}

View file

@ -39,7 +39,6 @@ import (
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
database "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
"github.com/arangodb/kube-arangodb/pkg/handlers/utils"
"github.com/arangodb/kube-arangodb/pkg/logging"
operator "github.com/arangodb/kube-arangodb/pkg/operatorV2"
"github.com/arangodb/kube-arangodb/pkg/operatorV2/event"
@ -51,9 +50,6 @@ import (
var logger = logging.Global().RegisterAndGetLogger("backup-operator", logging.Info)
const (
retryCount = 25
retryDelay = time.Second
// StateChange name of the event send when state changed
StateChange = "StateChange"
@ -181,9 +177,7 @@ func (h *handler) refreshDeploymentBackup(deployment *database.ArangoDeployment,
updateStatusBackup(backupMeta),
updateStatusBackupImported(util.NewType[bool](true)))
backup.Status = *status
err = h.updateBackupStatus(backup)
_, err = operator.WithArangoBackupUpdateStatusInterfaceRetry(context.Background(), h.client.BackupV1().ArangoBackups(backup.GetNamespace()), backup, *status, meta.UpdateOptions{})
if err != nil {
return err
}
@ -195,20 +189,6 @@ func (h *handler) Name() string {
return backup.ArangoBackupResourceKind
}
func (h *handler) updateBackupStatus(b *backupApi.ArangoBackup) error {
return utils.Retry(retryCount, retryDelay, func() error {
backup, err := h.client.BackupV1().ArangoBackups(b.Namespace).Get(context.Background(), b.Name, meta.GetOptions{})
if err != nil {
return err
}
backup.Status = b.Status
_, err = h.client.BackupV1().ArangoBackups(b.Namespace).UpdateStatus(context.Background(), backup, meta.UpdateOptions{})
return err
})
}
func (h *handler) getDeploymentMutex(namespace, deployment string) *sync.Mutex {
h.lock.Lock()
defer h.lock.Unlock()
@ -346,15 +326,13 @@ func (h *handler) Handle(_ context.Context, item operation.Item) error {
}
}
b.Status = *status
logger.Debug("Updating %s %s/%s",
item.Kind,
item.Namespace,
item.Name)
// Update status on object
if err := h.updateBackupStatus(b); err != nil {
if _, err := operator.WithArangoBackupUpdateStatusInterfaceRetry(context.Background(), h.client.BackupV1().ArangoBackups(b.GetNamespace()), b, *status, meta.UpdateOptions{}); err != nil {
return err
}

View file

@ -0,0 +1,49 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package operator
import "fmt"
func Stop(msg string, args ...interface{}) error {
return stop{
message: fmt.Sprintf(msg, args...),
}
}
type stop struct {
message string
}
func (r stop) Error() string {
return r.message
}
func IsStop(err error) bool {
if err == nil {
return false
}
if _, ok := err.(stop); ok {
return true
}
return false
}

View file

@ -22,38 +22,110 @@ package operator
import "context"
type HandleP0Func func(tx context.Context) error
type HandleP0Func func(ctx context.Context) (bool, error)
type HandleP1Func[P1 interface{}] func(tx context.Context, p1 P1) error
type HandleP1Func[P1 interface{}] func(ctx context.Context, p1 P1) (bool, error)
type HandleP2Func[P1, P2 interface{}] func(tx context.Context, p1 P1, p2 P2) error
type HandleP2Func[P1, P2 interface{}] func(ctx context.Context, p1 P1, p2 P2) (bool, error)
func HandleP0(ctx context.Context, handler ...HandleP0Func) error {
type HandleP3Func[P1, P2, P3 interface{}] func(ctx context.Context, p1 P1, p2 P2, p3 P3) (bool, error)
type HandleP4Func[P1, P2, P3, P4 interface{}] func(ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4) (bool, error)
type HandleP9Func[P1, P2, P3, P4, P5, P6, P7, P8, P9 interface{}] func(ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4, p5 P5, p6 P6, p7 P7, p8 P8, p9 P9) (bool, error)
func HandleP0(ctx context.Context, handler ...HandleP0Func) (bool, error) {
isChanged := false
for _, h := range handler {
if err := h(ctx); err != nil {
return err
changed, err := h(ctx)
if changed {
isChanged = true
}
if err != nil {
return isChanged, err
}
}
return nil
return isChanged, nil
}
func HandleP1[P1 interface{}](ctx context.Context, p1 P1, handler ...HandleP1Func[P1]) error {
func HandleP1[P1 interface{}](ctx context.Context, p1 P1, handler ...HandleP1Func[P1]) (bool, error) {
isChanged := false
for _, h := range handler {
if err := h(ctx, p1); err != nil {
return err
changed, err := h(ctx, p1)
if changed {
isChanged = true
}
if err != nil {
return isChanged, err
}
}
return nil
return isChanged, nil
}
func HandleP2[P1, P2 interface{}](ctx context.Context, p1 P1, p2 P2, handler ...HandleP2Func[P1, P2]) error {
func HandleP2[P1, P2 interface{}](ctx context.Context, p1 P1, p2 P2, handler ...HandleP2Func[P1, P2]) (bool, error) {
isChanged := false
for _, h := range handler {
if err := h(ctx, p1, p2); err != nil {
return err
changed, err := h(ctx, p1, p2)
if changed {
isChanged = true
}
if err != nil {
return isChanged, err
}
}
return nil
return isChanged, nil
}
func HandleP3[P1, P2, P3 interface{}](ctx context.Context, p1 P1, p2 P2, p3 P3, handler ...HandleP3Func[P1, P2, P3]) (bool, error) {
isChanged := false
for _, h := range handler {
changed, err := h(ctx, p1, p2, p3)
if changed {
isChanged = true
}
if err != nil {
return isChanged, err
}
}
return isChanged, nil
}
func HandleP4[P1, P2, P3, P4 interface{}](ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4, handler ...HandleP4Func[P1, P2, P3, P4]) (bool, error) {
isChanged := false
for _, h := range handler {
changed, err := h(ctx, p1, p2, p3, p4)
if changed {
isChanged = true
}
if err != nil {
return isChanged, err
}
}
return isChanged, nil
}
func HandleP9[P1, P2, P3, P4, P5, P6, P7, P8, P9 interface{}](ctx context.Context, p1 P1, p2 P2, p3 P3, p4 P4, p5 P5, p6 P6, p7 P7, p8 P8, p9 P9, handler ...HandleP9Func[P1, P2, P3, P4, P5, P6, P7, P8, P9]) (bool, error) {
isChanged := false
for _, h := range handler {
changed, err := h(ctx, p1, p2, p3, p4, p5, p6, p7, p8, p9)
if changed {
isChanged = true
}
if err != nil {
return isChanged, err
}
}
return isChanged, nil
}

86
pkg/operatorV2/update.go Normal file
View file

@ -0,0 +1,86 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package operator
import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/timer"
)
type Object[T interface{}] interface {
meta.Object
GetStatus() T
SetStatus(T)
}
type GetInterface[S interface{}, T Object[S]] interface {
Get(ctx context.Context, name string, options meta.GetOptions) (T, error)
}
type UpdateStatusInterfaceClient[S interface{}, T Object[S]] func(namespace string) UpdateStatusInterface[S, T]
type UpdateStatusInterface[S interface{}, T Object[S]] interface {
GetInterface[S, T]
UpdateStatus(ctx context.Context, in T, options meta.UpdateOptions) (T, error)
}
func WithUpdateStatusInterfaceRetry[S interface{}, T Object[S]](ctx context.Context, client UpdateStatusInterface[S, T], obj T, status S, opts meta.UpdateOptions) (T, error) {
for id := 0; id < globals.GetGlobals().Retry().OperatorUpdateRetryCount().Get(); id++ {
// Let's try to make a call
if nObj, err := WithUpdateStatusInterface(ctx, client, obj, status, opts); err == nil {
return nObj, nil
}
select {
case <-timer.After(globals.GetGlobals().Retry().OperatorUpdateRetryDelay().Get()):
continue
case <-ctx.Done():
return util.Default[T](), context.DeadlineExceeded
}
}
return util.Default[T](), errors.Newf("Unable to save Object %s/%s, retries exceeded", obj.GetNamespace(), obj.GetName())
}
func WithUpdateStatusInterface[S interface{}, T Object[S]](ctx context.Context, client UpdateStatusInterface[S, T], obj T, status S, opts meta.UpdateOptions) (T, error) {
cCtx, c := globals.GetGlobals().Timeouts().Kubernetes().WithTimeout(ctx)
defer c()
currentObj, err := client.Get(cCtx, obj.GetName(), meta.GetOptions{})
if err != nil {
return util.Default[T](), err
}
currentObj.SetStatus(status)
nCtx, c := globals.GetGlobals().Timeouts().Kubernetes().WithTimeout(ctx)
defer c()
return client.UpdateStatus(nCtx, currentObj, opts)
}

View file

@ -0,0 +1,38 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package operator
import (
"context"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
backupApi "github.com/arangodb/kube-arangodb/pkg/apis/backup/v1"
mlApi "github.com/arangodb/kube-arangodb/pkg/apis/ml/v1alpha1"
)
func WithArangoBackupUpdateStatusInterfaceRetry(ctx context.Context, client UpdateStatusInterface[backupApi.ArangoBackupStatus, *backupApi.ArangoBackup], obj *backupApi.ArangoBackup, status backupApi.ArangoBackupStatus, opts meta.UpdateOptions) (*backupApi.ArangoBackup, error) {
return WithUpdateStatusInterfaceRetry[backupApi.ArangoBackupStatus, *backupApi.ArangoBackup](ctx, client, obj, status, opts)
}
func WithArangoExtensionUpdateStatusInterfaceRetry(ctx context.Context, client UpdateStatusInterface[mlApi.ArangoMLExtensionStatus, *mlApi.ArangoMLExtension], obj *mlApi.ArangoMLExtension, status mlApi.ArangoMLExtensionStatus, opts meta.UpdateOptions) (*mlApi.ArangoMLExtension, error) {
return WithUpdateStatusInterfaceRetry[mlApi.ArangoMLExtensionStatus, *mlApi.ArangoMLExtension](ctx, client, obj, status, opts)
}

39
pkg/util/context.go Normal file
View file

@ -0,0 +1,39 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package util
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
)
func WithKubernetesContextTimeoutP2A2[P1, P2, A1, A2 interface{}](ctx context.Context, f func(context.Context, A1, A2) (P1, P2), a1 A1, a2 A2) (P1, P2) {
return WithContextTimeoutP2A2(ctx, globals.GetGlobals().Timeouts().Kubernetes().Get(), f, a1, a2)
}
func WithContextTimeoutP2A2[P1, P2, A1, A2 interface{}](ctx context.Context, timeout time.Duration, f func(context.Context, A1, A2) (P1, P2), a1 A1, a2 A2) (P1, P2) {
nCtx, c := context.WithTimeout(ctx, timeout)
defer c()
return f(nCtx, a1, a2)
}

View file

@ -0,0 +1,33 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package errors
type WithErrorArrayP2[IN, P1, P2 any] func(p1 P1, p2 P2, in IN) error
func ExecuteWithErrorArrayP2[IN, P1, P2 any](caller WithErrorArrayP2[IN, P1, P2], p1 P1, p2 P2, elements ...IN) error {
errors := make([]error, len(elements))
for id := range elements {
errors[id] = caller(p1, p2, elements[id])
}
return Errors(errors...)
}

View file

@ -41,6 +41,11 @@ const (
DefaultKubernetesRequestBatchSize = 256
DefaultBackupConcurrentUploads = 4
// Retry
DefaultOperatorUpdateRetryCount = 25
DefaultOperatorUpdateRetryDelay = time.Second
)
var globalObj = &globals{
@ -61,6 +66,10 @@ var globalObj = &globals{
backup: &globalBackup{
concurrentUploads: NewInt(DefaultBackupConcurrentUploads),
},
retry: &globalRetry{
operatorUpdateRetryCount: NewInt(DefaultOperatorUpdateRetryCount),
operatorUpdateRetryDelay: NewTimeout(DefaultOperatorUpdateRetryDelay),
},
}
func GetGlobals() Globals {
@ -75,12 +84,18 @@ type Globals interface {
Timeouts() GlobalTimeouts
Kubernetes() GlobalKubernetes
Backup() GlobalBackup
Retry() GlobalRetry
}
type globals struct {
timeouts *globalTimeouts
kubernetes *globalKubernetes
backup *globalBackup
retry *globalRetry
}
func (g globals) Retry() GlobalRetry {
return g.retry
}
func (g globals) Backup() GlobalBackup {
@ -174,3 +189,21 @@ func (g *globalTimeouts) BackupArangoClientTimeout() Timeout {
func (g *globalTimeouts) BackupArangoClientUploadTimeout() Timeout {
return g.backupArangoClientUploadTimeout
}
type GlobalRetry interface {
OperatorUpdateRetryCount() Int
OperatorUpdateRetryDelay() Timeout
}
type globalRetry struct {
operatorUpdateRetryCount Int
operatorUpdateRetryDelay Timeout
}
func (g *globalRetry) OperatorUpdateRetryCount() Int {
return g.operatorUpdateRetryCount
}
func (g *globalRetry) OperatorUpdateRetryDelay() Timeout {
return g.operatorUpdateRetryDelay
}