diff --git a/CHANGELOG.md b/CHANGELOG.md index 8f4bf5c78..443d9db2b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - (Feature) Parametrize ForceDelete timeout - (Feature) Scheduler BatchJob Integration Definition - (Feature) Scheduler CronJob Integration Definition +- (Feature) Scheduler BatchJob Integration Service ## [1.2.39](https://github.com/arangodb/kube-arangodb/tree/1.2.39) (2024-03-11) - (Feature) Extract Scheduler API diff --git a/integrations/scheduler/v1/batch_job_test.go b/integrations/scheduler/v1/batch_job_test.go new file mode 100644 index 000000000..1ae84af99 --- /dev/null +++ b/integrations/scheduler/v1/batch_job_test.go @@ -0,0 +1,204 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + batch "k8s.io/api/batch/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" + schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1" + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/util/tests" +) + +func Test_BatchJob(t *testing.T) { + ctx, c := context.WithCancel(context.Background()) + defer c() + + client := kclient.NewFakeClientBuilder().Add( + tests.NewMetaObject(t, tests.FakeNamespace, "test", func(t *testing.T, obj *schedulerApi.ArangoProfile) { + obj.Spec = schedulerApi.ProfileSpec{} + }), + tests.NewMetaObject(t, tests.FakeNamespace, "test-select-all", func(t *testing.T, obj *schedulerApi.ArangoProfile) { + obj.Spec = schedulerApi.ProfileSpec{ + Selectors: &schedulerApi.ProfileSelectors{ + Label: &meta.LabelSelector{}, + }, + Template: &schedulerApi.ProfileTemplate{}, + } + }), + tests.NewMetaObject(t, tests.FakeNamespace, "test-select-specific", func(t *testing.T, obj *schedulerApi.ArangoProfile) { + obj.Spec = schedulerApi.ProfileSpec{ + Selectors: &schedulerApi.ProfileSelectors{ + Label: &meta.LabelSelector{ + MatchLabels: map[string]string{ + "A": "B", + }, + }, + }, + Template: &schedulerApi.ProfileTemplate{}, + } + }), + ).Client() + + scheduler := Client(t, ctx, client, func(c Configuration) Configuration { + c.Namespace = tests.FakeNamespace + c.VerifyAccess = false + return c + }) + + t.Run("Ensure job does not exist - get", func(t *testing.T) { + resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.False(t, resp.GetExists()) + }) + + t.Run("Ensure job does not exist - list", func(t *testing.T) { + resp, err := scheduler.ListBatchJob(context.Background(), &pbSchedulerV1.ListBatchJobRequest{}) + require.NoError(t, err) + + require.Len(t, resp.GetBatchJobs(), 0) + }) + + t.Run("Schedule Job", func(t *testing.T) { + resp, err := scheduler.CreateBatchJob(context.Background(), &pbSchedulerV1.CreateBatchJobRequest{ + Spec: &pbSchedulerV1.Spec{ + Metadata: &pbSchedulerV1.Metadata{ + Name: "test", + }, + Job: &pbSchedulerV1.JobBase{ + Labels: nil, + Profiles: []string{ + "test", + }, + }, + Containers: map[string]*pbSchedulerV1.ContainerBase{ + "example": { + Image: util.NewType("ubuntu:20.04"), + Args: []string{ + "/bin/bash", + "-c", + "true", + }, + }, + }, + }, + BatchJob: &pbSchedulerV1.BatchJobSpec{ + Completions: util.NewType[int32](1), + }, + }) + require.NoError(t, err) + + require.EqualValues(t, "test", resp.GetName()) + require.Len(t, resp.Profiles, 2) + require.Contains(t, resp.Profiles, "test") + require.Contains(t, resp.Profiles, "test-select-all") + require.NotContains(t, resp.Profiles, "test-select-specific") + }) + + t.Run("Ensure job exist - get", func(t *testing.T) { + resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.True(t, resp.GetExists()) + }) + + t.Run("Ensure job exist - list", func(t *testing.T) { + resp, err := scheduler.ListBatchJob(context.Background(), &pbSchedulerV1.ListBatchJobRequest{}) + require.NoError(t, err) + + require.Len(t, resp.GetBatchJobs(), 1) + require.Contains(t, resp.GetBatchJobs(), "test") + }) + + t.Run("Ensure job details - pre", func(t *testing.T) { + resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.True(t, resp.GetExists()) + require.EqualValues(t, 0, resp.GetBatchJob().GetStatus().GetSucceeded()) + }) + + t.Run("Ensure job details - update", func(t *testing.T) { + job := tests.NewMetaObject[*batch.Job](t, tests.FakeNamespace, "test") + + tests.RefreshObjectsC(t, client, &job) + + job.Status.Succeeded = 1 + + tests.UpdateObjectsC(t, client, &job) + }) + + t.Run("Ensure job details - post", func(t *testing.T) { + resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.True(t, resp.GetExists()) + require.EqualValues(t, 1, resp.GetBatchJob().GetStatus().GetSucceeded()) + }) + + t.Run("Delete Job", func(t *testing.T) { + resp, err := scheduler.DeleteBatchJob(context.Background(), &pbSchedulerV1.DeleteBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + require.True(t, resp.GetExists()) + }) + + t.Run("Re-Delete Job", func(t *testing.T) { + resp, err := scheduler.DeleteBatchJob(context.Background(), &pbSchedulerV1.DeleteBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + require.False(t, resp.GetExists()) + }) + + t.Run("Ensure job does not exist after deletion - get", func(t *testing.T) { + resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.False(t, resp.GetExists()) + }) + + t.Run("Ensure job does not exist after deletion - list", func(t *testing.T) { + resp, err := scheduler.ListBatchJob(context.Background(), &pbSchedulerV1.ListBatchJobRequest{}) + require.NoError(t, err) + + require.Len(t, resp.GetBatchJobs(), 0) + }) +} diff --git a/cmd/scheduler.go b/integrations/scheduler/v1/configuration.go similarity index 66% rename from cmd/scheduler.go rename to integrations/scheduler/v1/configuration.go index 32fd48cc8..a9112d41c 100644 --- a/cmd/scheduler.go +++ b/integrations/scheduler/v1/configuration.go @@ -18,22 +18,29 @@ // Copyright holder is ArangoDB GmbH, Cologne, Germany // -package cmd +package v1 -import ( - "github.com/spf13/cobra" +type Mod func(c Configuration) Configuration - "github.com/arangodb/kube-arangodb/pkg/scheduler" -) - -func init() { - cmd := &cobra.Command{ - Use: "scheduler", +func NewConfiguration() Configuration { + return Configuration{ + Namespace: "default", + VerifyAccess: true, } - - if err := scheduler.InitCommand(cmd); err != nil { - panic(err.Error()) - } - - cmdMain.AddCommand(cmd) +} + +type Configuration struct { + Namespace string + + VerifyAccess bool +} + +func (c Configuration) With(mods ...Mod) Configuration { + n := c + + for _, mod := range mods { + n = mod(n) + } + + return n } diff --git a/integrations/scheduler/v1/cron_job_test.go b/integrations/scheduler/v1/cron_job_test.go new file mode 100644 index 000000000..acbb2565c --- /dev/null +++ b/integrations/scheduler/v1/cron_job_test.go @@ -0,0 +1,250 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + batch "k8s.io/api/batch/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" + schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1" + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/util/tests" +) + +func Test_CronJob(t *testing.T) { + ctx, c := context.WithCancel(context.Background()) + defer c() + + client := kclient.NewFakeClientBuilder().Add( + tests.NewMetaObject(t, tests.FakeNamespace, "test", func(t *testing.T, obj *schedulerApi.ArangoProfile) { + obj.Spec = schedulerApi.ProfileSpec{} + }), + tests.NewMetaObject(t, tests.FakeNamespace, "test-select-all", func(t *testing.T, obj *schedulerApi.ArangoProfile) { + obj.Spec = schedulerApi.ProfileSpec{ + Selectors: &schedulerApi.ProfileSelectors{ + Label: &meta.LabelSelector{}, + }, + Template: &schedulerApi.ProfileTemplate{}, + } + }), + tests.NewMetaObject(t, tests.FakeNamespace, "test-select-specific", func(t *testing.T, obj *schedulerApi.ArangoProfile) { + obj.Spec = schedulerApi.ProfileSpec{ + Selectors: &schedulerApi.ProfileSelectors{ + Label: &meta.LabelSelector{ + MatchLabels: map[string]string{ + "A": "B", + }, + }, + }, + Template: &schedulerApi.ProfileTemplate{}, + } + }), + ).Client() + + scheduler := Client(t, ctx, client, func(c Configuration) Configuration { + c.Namespace = tests.FakeNamespace + c.VerifyAccess = false + return c + }) + + t.Run("Ensure job does not exist - get", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.False(t, resp.GetExists()) + }) + + t.Run("Ensure job does not exist - list", func(t *testing.T) { + resp, err := scheduler.ListCronJob(context.Background(), &pbSchedulerV1.ListCronJobRequest{}) + require.NoError(t, err) + + require.Len(t, resp.GetCronJobs(), 0) + }) + + t.Run("Schedule Job", func(t *testing.T) { + resp, err := scheduler.CreateCronJob(context.Background(), &pbSchedulerV1.CreateCronJobRequest{ + Spec: &pbSchedulerV1.Spec{ + Metadata: &pbSchedulerV1.Metadata{ + Name: "test", + }, + Job: &pbSchedulerV1.JobBase{ + Labels: nil, + Profiles: []string{ + "test", + }, + }, + Containers: map[string]*pbSchedulerV1.ContainerBase{ + "example": { + Image: util.NewType("ubuntu:20.04"), + Args: []string{ + "/bin/bash", + "-c", + "true", + }, + }, + }, + }, + CronJob: &pbSchedulerV1.CronJobSpec{ + Schedule: "* * * * *", + Job: &pbSchedulerV1.BatchJobSpec{ + Parallelism: util.NewType[int32](1), + Completions: util.NewType[int32](1), + BackoffLimit: util.NewType[int32](1), + }, + }, + }) + require.NoError(t, err) + + require.EqualValues(t, "test", resp.GetName()) + require.Len(t, resp.Profiles, 2) + require.Contains(t, resp.Profiles, "test") + require.Contains(t, resp.Profiles, "test-select-all") + require.NotContains(t, resp.Profiles, "test-select-specific") + }) + + t.Run("Ensure job exist - get", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.True(t, resp.GetExists()) + }) + + t.Run("Ensure job exist - list", func(t *testing.T) { + resp, err := scheduler.ListCronJob(context.Background(), &pbSchedulerV1.ListCronJobRequest{}) + require.NoError(t, err) + + require.Len(t, resp.GetCronJobs(), 1) + require.Contains(t, resp.GetCronJobs(), "test") + }) + + t.Run("Ensure job details - pre", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.True(t, resp.GetExists()) + require.Len(t, resp.GetBatchJobs(), 0) + }) + + t.Run("Ensure job details - update", func(t *testing.T) { + job := tests.NewMetaObject[*batch.CronJob](t, tests.FakeNamespace, "test") + + tests.RefreshObjectsC(t, client, &job) + + job.Status.Active = []core.ObjectReference{ + { + Name: "test-job", + }, + } + + tests.UpdateObjectsC(t, client, &job) + }) + + t.Run("Ensure job details - post", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.True(t, resp.GetExists()) + require.Len(t, resp.GetBatchJobs(), 1) + }) + + t.Run("Update Job - Pre", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + require.True(t, resp.GetExists()) + require.EqualValues(t, "* * * * *", resp.GetCronJob().GetSpec().GetSchedule()) + require.EqualValues(t, 1, resp.GetCronJob().GetSpec().GetJob().GetCompletions()) + }) + + t.Run("Update Job", func(t *testing.T) { + resp, err := scheduler.UpdateCronJob(context.Background(), &pbSchedulerV1.UpdateCronJobRequest{ + Name: "test", + Spec: &pbSchedulerV1.CronJobSpec{ + Schedule: "1 * * * *", + Job: &pbSchedulerV1.BatchJobSpec{ + Completions: util.NewType[int32](5), + }, + }, + }) + require.NoError(t, err) + require.True(t, resp.GetExists()) + require.EqualValues(t, "1 * * * *", resp.GetCronJob().GetSpec().GetSchedule()) + require.EqualValues(t, 5, resp.GetCronJob().GetSpec().GetJob().GetCompletions()) + }) + + t.Run("Update Job - Post", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + require.True(t, resp.GetExists()) + require.EqualValues(t, "1 * * * *", resp.GetCronJob().GetSpec().GetSchedule()) + require.EqualValues(t, 5, resp.GetCronJob().GetSpec().GetJob().GetCompletions()) + }) + + t.Run("Delete Job", func(t *testing.T) { + resp, err := scheduler.DeleteCronJob(context.Background(), &pbSchedulerV1.DeleteCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + require.True(t, resp.GetExists()) + }) + + t.Run("Re-Delete Job", func(t *testing.T) { + resp, err := scheduler.DeleteCronJob(context.Background(), &pbSchedulerV1.DeleteCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + require.False(t, resp.GetExists()) + }) + + t.Run("Ensure job does not exist after deletion - get", func(t *testing.T) { + resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{ + Name: "test", + }) + require.NoError(t, err) + + require.False(t, resp.GetExists()) + }) + + t.Run("Ensure job does not exist after deletion - list", func(t *testing.T) { + resp, err := scheduler.ListCronJob(context.Background(), &pbSchedulerV1.ListCronJobRequest{}) + require.NoError(t, err) + + require.Len(t, resp.GetCronJobs(), 0) + }) +} diff --git a/integrations/scheduler/v1/definition/cronjob.pb.go b/integrations/scheduler/v1/definition/cronjob.pb.go index a7ecf2280..6b32da840 100644 --- a/integrations/scheduler/v1/definition/cronjob.pb.go +++ b/integrations/scheduler/v1/definition/cronjob.pb.go @@ -46,10 +46,8 @@ type CronJob struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - // Keeps BatchJob settings - Job *BatchJobSpec `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"` // Keeps the CronJob Settings - Spec *CronJobSpec `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"` + Spec *CronJobSpec `protobuf:"bytes,1,opt,name=spec,proto3" json:"spec,omitempty"` } func (x *CronJob) Reset() { @@ -84,13 +82,6 @@ func (*CronJob) Descriptor() ([]byte, []int) { return file_integrations_scheduler_v1_definition_cronjob_proto_rawDescGZIP(), []int{0} } -func (x *CronJob) GetJob() *BatchJobSpec { - if x != nil { - return x.Job - } - return nil -} - func (x *CronJob) GetSpec() *CronJobSpec { if x != nil { return x.Spec @@ -106,6 +97,8 @@ type CronJobSpec struct { // Schedule definition Schedule string `protobuf:"bytes,1,opt,name=schedule,proto3" json:"schedule,omitempty"` + // Keeps BatchJob settings + Job *BatchJobSpec `protobuf:"bytes,2,opt,name=job,proto3" json:"job,omitempty"` } func (x *CronJobSpec) Reset() { @@ -147,6 +140,13 @@ func (x *CronJobSpec) GetSchedule() string { return "" } +func (x *CronJobSpec) GetJob() *BatchJobSpec { + if x != nil { + return x.Job + } + return nil +} + var File_integrations_scheduler_v1_definition_cronjob_proto protoreflect.FileDescriptor var file_integrations_scheduler_v1_definition_cronjob_proto_rawDesc = []byte{ @@ -157,16 +157,16 @@ var file_integrations_scheduler_v1_definition_cronjob_proto_rawDesc = []byte{ 0x33, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6a, 0x6f, 0x62, 0x2e, 0x70, - 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x60, 0x0a, 0x07, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, - 0x29, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x73, - 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4a, 0x6f, - 0x62, 0x53, 0x70, 0x65, 0x63, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x2a, 0x0a, 0x04, 0x73, 0x70, - 0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x64, - 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x70, 0x65, 0x63, - 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x22, 0x29, 0x0a, 0x0b, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, - 0x62, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, - 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, - 0x65, 0x42, 0x48, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x35, 0x0a, 0x07, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12, + 0x2a, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, + 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, + 0x62, 0x53, 0x70, 0x65, 0x63, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x22, 0x54, 0x0a, 0x0b, 0x43, + 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x63, + 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x63, + 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x29, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e, + 0x42, 0x61, 0x74, 0x63, 0x68, 0x4a, 0x6f, 0x62, 0x53, 0x70, 0x65, 0x63, 0x52, 0x03, 0x6a, 0x6f, + 0x62, 0x42, 0x48, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2f, 0x76, 0x31, @@ -193,8 +193,8 @@ var file_integrations_scheduler_v1_definition_cronjob_proto_goTypes = []interfac (*BatchJobSpec)(nil), // 2: scheduler.BatchJobSpec } var file_integrations_scheduler_v1_definition_cronjob_proto_depIdxs = []int32{ - 2, // 0: scheduler.CronJob.job:type_name -> scheduler.BatchJobSpec - 1, // 1: scheduler.CronJob.spec:type_name -> scheduler.CronJobSpec + 1, // 0: scheduler.CronJob.spec:type_name -> scheduler.CronJobSpec + 2, // 1: scheduler.CronJobSpec.job:type_name -> scheduler.BatchJobSpec 2, // [2:2] is the sub-list for method output_type 2, // [2:2] is the sub-list for method input_type 2, // [2:2] is the sub-list for extension type_name diff --git a/integrations/scheduler/v1/definition/cronjob.proto b/integrations/scheduler/v1/definition/cronjob.proto index 93e014423..b0f37eadc 100644 --- a/integrations/scheduler/v1/definition/cronjob.proto +++ b/integrations/scheduler/v1/definition/cronjob.proto @@ -28,15 +28,15 @@ option go_package = "github.com/arangodb/kube-arangodb/integrations/scheduler/v1 // Keeps information about Kubernetes Batch/V1 CronJob message CronJob { - // Keeps BatchJob settings - BatchJobSpec job = 1; - // Keeps the CronJob Settings - CronJobSpec spec = 2; + CronJobSpec spec = 1; } // Information about CronJob run settings message CronJobSpec { // Schedule definition string schedule = 1; + + // Keeps BatchJob settings + BatchJobSpec job = 2; } diff --git a/integrations/scheduler/v1/implementation.go b/integrations/scheduler/v1/implementation.go new file mode 100644 index 000000000..0eec1473e --- /dev/null +++ b/integrations/scheduler/v1/implementation.go @@ -0,0 +1,504 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + + "google.golang.org/grpc" + batch "k8s.io/api/batch/v1" + core "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" + "github.com/arangodb/kube-arangodb/pkg/debug_package/generators/kubernetes" + "github.com/arangodb/kube-arangodb/pkg/scheduler" + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" + kresources "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/resources" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/util/svc" +) + +var _ pbSchedulerV1.SchedulerV1Server = &implementation{} +var _ svc.Handler = &implementation{} + +func New(ctx context.Context, client kclient.Client, cfg Configuration) (svc.Handler, error) { + return newInternal(ctx, client, cfg) +} + +func newInternal(ctx context.Context, client kclient.Client, cfg Configuration) (*implementation, error) { + if cfg.VerifyAccess { + // Lets Verify Access + if err := kresources.VerifyAll(ctx, client.Kubernetes(), + kresources.AccessRequest{ + Verb: "create", + Group: "batch", + Version: "v1", + Resource: "jobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "list", + Group: "batch", + Version: "v1", + Resource: "jobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "delete", + Group: "batch", + Version: "v1", + Resource: "jobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "get", + Group: "batch", + Version: "v1", + Resource: "jobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "create", + Group: "batch", + Version: "v1", + Resource: "cronjobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "list", + Group: "batch", + Version: "v1", + Resource: "cronjobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "delete", + Group: "batch", + Version: "v1", + Resource: "cronjobs", + Namespace: cfg.Namespace, + }, + kresources.AccessRequest{ + Verb: "get", + Group: "batch", + Version: "v1", + Resource: "cronjobs", + Namespace: cfg.Namespace, + }, + ); err != nil { + return nil, errors.WithMessagef(err, "Unable to access API") + } + } + + return &implementation{ + cfg: cfg, + client: client, + scheduler: scheduler.NewScheduler(client, cfg.Namespace), + }, nil +} + +type implementation struct { + cfg Configuration + + client kclient.Client + scheduler scheduler.Scheduler + + pbSchedulerV1.UnimplementedSchedulerV1Server +} + +func (i *implementation) Name() string { + return pbSchedulerV1.Name +} + +func (i *implementation) Register(registrar *grpc.Server) { + pbSchedulerV1.RegisterSchedulerV1Server(registrar, i) +} + +func (i *implementation) Health() svc.HealthState { + return svc.Healthy +} + +func (i *implementation) CreateBatchJob(ctx context.Context, request *pbSchedulerV1.CreateBatchJobRequest) (*pbSchedulerV1.CreateBatchJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + rendered, profiles, err := i.scheduler.Render(ctx, request.GetSpec()) + if err != nil { + return nil, err + } + + rendered.Spec.RestartPolicy = core.RestartPolicyNever + + var spec batch.Job + + spec.Namespace = i.cfg.Namespace + + if meta := request.GetSpec().GetMetadata(); meta != nil { + if util.TypeOrDefault(meta.GenerateName, false) { + spec.GenerateName = meta.Name + } else { + spec.Name = meta.Name + } + } + + spec.Spec.Template = *rendered + + if batchJob := request.GetBatchJob(); batchJob != nil { + if v := batchJob.Completions; v != nil { + spec.Spec.Completions = v + } + + if v := batchJob.Parallelism; v != nil { + spec.Spec.Parallelism = v + } + + if v := batchJob.BackoffLimit; v != nil { + spec.Spec.BackoffLimit = v + } + } + + if batchJobSpec := request.GetSpec(); batchJobSpec != nil { + if job := batchJobSpec.Job; job != nil { + spec.Labels = job.Labels + } + } + + job, err := i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace).Create(ctx, &spec, meta.CreateOptions{}) + + if err != nil { + return nil, err + } + + return &pbSchedulerV1.CreateBatchJobResponse{ + Name: job.Name, + Profiles: profiles, + }, nil +} + +func (i *implementation) GetBatchJob(ctx context.Context, request *pbSchedulerV1.GetBatchJobRequest) (*pbSchedulerV1.GetBatchJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + job, err := i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace).Get(ctx, request.GetName(), meta.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return &pbSchedulerV1.GetBatchJobResponse{ + Exists: false, + }, nil + } + + return nil, err + } + + return &pbSchedulerV1.GetBatchJobResponse{ + Exists: true, + + BatchJob: &pbSchedulerV1.BatchJob{ + Spec: &pbSchedulerV1.BatchJobSpec{ + Parallelism: job.Spec.Parallelism, + Completions: job.Spec.Completions, + BackoffLimit: job.Spec.BackoffLimit, + }, + Status: &pbSchedulerV1.BatchJobStatus{ + Active: job.Status.Active, + Succeeded: job.Status.Succeeded, + Failed: job.Status.Failed, + }, + }, + }, nil +} + +func (i *implementation) DeleteBatchJob(ctx context.Context, request *pbSchedulerV1.DeleteBatchJobRequest) (*pbSchedulerV1.DeleteBatchJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + var d meta.DeleteOptions + + if v := request.DeleteChildPods; v != nil { + if *v { + d.PropagationPolicy = util.NewType(meta.DeletePropagationBackground) + } else { + d.PropagationPolicy = util.NewType(meta.DeletePropagationOrphan) + } + } + + err := i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace).Delete(ctx, request.GetName(), d) + if err != nil { + if kerrors.IsNotFound(err) { + return &pbSchedulerV1.DeleteBatchJobResponse{ + Exists: false, + }, nil + } + + return nil, err + } + + return &pbSchedulerV1.DeleteBatchJobResponse{Exists: true}, nil +} + +func (i *implementation) ListBatchJob(ctx context.Context, request *pbSchedulerV1.ListBatchJobRequest) (*pbSchedulerV1.ListBatchJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + objects, err := kubernetes.ListObjects[*batch.JobList, *batch.Job](ctx, i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace), func(result *batch.JobList) []*batch.Job { + r := make([]*batch.Job, len(result.Items)) + + for id := range result.Items { + r[id] = result.Items[id].DeepCopy() + } + + return r + }) + + if err != nil { + return nil, err + } + + return &pbSchedulerV1.ListBatchJobResponse{ + BatchJobs: kubernetes.Extract(objects, func(in *batch.Job) string { + return in.GetName() + }), + }, nil +} + +func (i *implementation) CreateCronJob(ctx context.Context, request *pbSchedulerV1.CreateCronJobRequest) (*pbSchedulerV1.CreateCronJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + rendered, profiles, err := i.scheduler.Render(ctx, request.GetSpec()) + if err != nil { + return nil, err + } + + rendered.Spec.RestartPolicy = core.RestartPolicyNever + + var spec batch.CronJob + + spec.Namespace = i.cfg.Namespace + + if meta := request.GetSpec().GetMetadata(); meta != nil { + if util.TypeOrDefault(meta.GenerateName, false) { + spec.GenerateName = meta.Name + } else { + spec.Name = meta.Name + } + } + + spec.Spec.JobTemplate.Spec.Template = *rendered + + if cronJob := request.GetCronJob(); cronJob != nil { + spec.Spec.Schedule = cronJob.Schedule + + if batchJob := cronJob.GetJob(); batchJob != nil { + if v := batchJob.Completions; v != nil { + spec.Spec.JobTemplate.Spec.Completions = v + } + + if v := batchJob.Parallelism; v != nil { + spec.Spec.JobTemplate.Spec.Parallelism = v + } + + if v := batchJob.BackoffLimit; v != nil { + spec.Spec.JobTemplate.Spec.BackoffLimit = v + } + } + } + + if batchJobSpec := request.GetSpec(); batchJobSpec != nil { + if job := batchJobSpec.Job; job != nil { + spec.Labels = job.Labels + spec.Spec.JobTemplate.Labels = job.Labels + } + } + + job, err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Create(ctx, &spec, meta.CreateOptions{}) + + if err != nil { + return nil, err + } + + return &pbSchedulerV1.CreateCronJobResponse{ + Name: job.Name, + Profiles: profiles, + }, nil +} + +func (i *implementation) GetCronJob(ctx context.Context, request *pbSchedulerV1.GetCronJobRequest) (*pbSchedulerV1.GetCronJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + job, err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Get(ctx, request.GetName(), meta.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return &pbSchedulerV1.GetCronJobResponse{ + Exists: false, + }, nil + } + + return nil, err + } + + return &pbSchedulerV1.GetCronJobResponse{ + Exists: true, + + CronJob: &pbSchedulerV1.CronJob{ + Spec: &pbSchedulerV1.CronJobSpec{ + Schedule: job.Spec.Schedule, + + Job: &pbSchedulerV1.BatchJobSpec{ + Parallelism: job.Spec.JobTemplate.Spec.Parallelism, + Completions: job.Spec.JobTemplate.Spec.Completions, + BackoffLimit: job.Spec.JobTemplate.Spec.BackoffLimit, + }, + }, + }, + + BatchJobs: kubernetes.Extract(job.Status.Active, func(in core.ObjectReference) string { + return in.Name + }), + }, nil +} + +func (i *implementation) UpdateCronJob(ctx context.Context, request *pbSchedulerV1.UpdateCronJobRequest) (*pbSchedulerV1.UpdateCronJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + job, err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Get(ctx, request.GetName(), meta.GetOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return &pbSchedulerV1.UpdateCronJobResponse{ + Exists: false, + }, nil + } + + return nil, err + } + + if cronJob := request.GetSpec(); cronJob != nil { + job.Spec.Schedule = cronJob.Schedule + + if batchJob := cronJob.GetJob(); batchJob != nil { + if v := batchJob.Completions; v != nil { + job.Spec.JobTemplate.Spec.Completions = v + } + + if v := batchJob.Parallelism; v != nil { + job.Spec.JobTemplate.Spec.Parallelism = v + } + + if v := batchJob.BackoffLimit; v != nil { + job.Spec.JobTemplate.Spec.BackoffLimit = v + } + } + } + + job, err = i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Update(ctx, job, meta.UpdateOptions{}) + if err != nil { + if kerrors.IsNotFound(err) { + return &pbSchedulerV1.UpdateCronJobResponse{ + Exists: false, + }, nil + } + + return nil, err + } + + return &pbSchedulerV1.UpdateCronJobResponse{ + Exists: true, + + CronJob: &pbSchedulerV1.CronJob{ + Spec: &pbSchedulerV1.CronJobSpec{ + Schedule: job.Spec.Schedule, + + Job: &pbSchedulerV1.BatchJobSpec{ + Parallelism: job.Spec.JobTemplate.Spec.Parallelism, + Completions: job.Spec.JobTemplate.Spec.Completions, + BackoffLimit: job.Spec.JobTemplate.Spec.BackoffLimit, + }, + }, + }, + }, nil +} + +func (i *implementation) ListCronJob(ctx context.Context, request *pbSchedulerV1.ListCronJobRequest) (*pbSchedulerV1.ListCronJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + objects, err := kubernetes.ListObjects[*batch.CronJobList, *batch.CronJob](ctx, i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace), func(result *batch.CronJobList) []*batch.CronJob { + r := make([]*batch.CronJob, len(result.Items)) + + for id := range result.Items { + r[id] = result.Items[id].DeepCopy() + } + + return r + }) + + if err != nil { + return nil, err + } + + return &pbSchedulerV1.ListCronJobResponse{ + CronJobs: kubernetes.Extract(objects, func(in *batch.CronJob) string { + return in.GetName() + }), + }, nil +} + +func (i *implementation) DeleteCronJob(ctx context.Context, request *pbSchedulerV1.DeleteCronJobRequest) (*pbSchedulerV1.DeleteCronJobResponse, error) { + if request == nil { + return nil, errors.Errorf("Request is nil") + } + + var d meta.DeleteOptions + + if v := request.DeleteChildPods; v != nil { + if *v { + d.PropagationPolicy = util.NewType(meta.DeletePropagationBackground) + } else { + d.PropagationPolicy = util.NewType(meta.DeletePropagationOrphan) + } + } + + err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Delete(ctx, request.GetName(), d) + if err != nil { + if kerrors.IsNotFound(err) { + return &pbSchedulerV1.DeleteCronJobResponse{ + Exists: false, + }, nil + } + + return nil, err + } + + return &pbSchedulerV1.DeleteCronJobResponse{Exists: true}, nil +} diff --git a/integrations/scheduler/v1/suite_test.go b/integrations/scheduler/v1/suite_test.go new file mode 100644 index 000000000..b4dd70ac4 --- /dev/null +++ b/integrations/scheduler/v1/suite_test.go @@ -0,0 +1,50 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package v1 + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/util/svc" + "github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc" +) + +func Handler(t *testing.T, ctx context.Context, client kclient.Client, mods ...Mod) svc.Handler { + handler, err := New(ctx, client, NewConfiguration().With(mods...)) + require.NoError(t, err) + + return handler +} + +func Client(t *testing.T, ctx context.Context, client kclient.Client, mods ...Mod) pbSchedulerV1.SchedulerV1Client { + local := svc.NewService(svc.Configuration{ + Address: "127.0.0.1:0", + }, Handler(t, ctx, client, mods...)) + + start := local.Start(ctx) + + return tgrpc.NewGRPCClient(t, ctx, pbSchedulerV1.NewSchedulerV1Client, start.Address()) +} diff --git a/pkg/crd/apply.go b/pkg/crd/apply.go index 4d37192af..73a625307 100644 --- a/pkg/crd/apply.go +++ b/pkg/crd/apply.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ package crd import ( "context" - "fmt" authorization "k8s.io/api/authorization/v1" apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" @@ -33,6 +32,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/crd/crds" "github.com/arangodb/kube-arangodb/pkg/logging" + kresources "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/resources" "github.com/arangodb/kube-arangodb/pkg/util/kclient" ) @@ -150,31 +150,7 @@ func verifyCRDAccess(ctx context.Context, client kclient.Client, crd string, ver return *c } - r, err := verifyCRDAccessRequest(ctx, client, crd, verb) - if err != nil { - return authorization.SubjectAccessReviewStatus{ - Allowed: false, - Reason: fmt.Sprintf("Unable to check access: %s", err.Error()), - } - } - - return r.Status + return kresources.VerifyAccessRequestStatus(ctx, client.Kubernetes(), verb, "apiextensions.k8s.io", "v1", "customresourcedefinitions", "", crd, "") } var verifyCRDAccessForTests *authorization.SubjectAccessReviewStatus - -func verifyCRDAccessRequest(ctx context.Context, client kclient.Client, crd string, verb string) (*authorization.SelfSubjectAccessReview, error) { - review := authorization.SelfSubjectAccessReview{ - Spec: authorization.SelfSubjectAccessReviewSpec{ - ResourceAttributes: &authorization.ResourceAttributes{ - Verb: verb, - Group: "apiextensions.k8s.io", - Version: "v1", - Resource: "customresourcedefinitions", - Name: crd, - }, - }, - } - - return client.Kubernetes().AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, &review, meta.CreateOptions{}) -} diff --git a/pkg/integrations/scheduler_v1.go b/pkg/integrations/scheduler_v1.go new file mode 100644 index 000000000..34cb3c565 --- /dev/null +++ b/pkg/integrations/scheduler_v1.go @@ -0,0 +1,69 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package integrations + +import ( + "context" + + "github.com/spf13/cobra" + + pbImplSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1" + "github.com/arangodb/kube-arangodb/pkg/util/constants" + "github.com/arangodb/kube-arangodb/pkg/util/errors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" + "github.com/arangodb/kube-arangodb/pkg/util/svc" +) + +func init() { + register(func() Integration { + return &schedulerV1{} + }) +} + +type schedulerV1 struct { + Configuration pbImplSchedulerV1.Configuration +} + +func (b *schedulerV1) Name() string { + return "scheduler.v1" +} + +func (b *schedulerV1) Description() string { + return "SchedulerV1 Integration" +} + +func (b *schedulerV1) Register(cmd *cobra.Command, arg ArgGen) error { + f := cmd.Flags() + + f.StringVar(&b.Configuration.Namespace, arg("namespace"), constants.NamespaceWithDefault("default"), "Kubernetes Namespace") + f.BoolVar(&b.Configuration.VerifyAccess, arg("verify-access"), true, "Verify the CRD Access") + + return nil +} + +func (b *schedulerV1) Handler(ctx context.Context) (svc.Handler, error) { + client, ok := kclient.GetDefaultFactory().Client() + if !ok { + return nil, errors.Errorf("Unable to create Kubernetes Client") + } + + return pbImplSchedulerV1.New(ctx, client, b.Configuration) +} diff --git a/pkg/scheduler/cli.go b/pkg/scheduler/cli.go deleted file mode 100644 index 3a9c31a2f..000000000 --- a/pkg/scheduler/cli.go +++ /dev/null @@ -1,150 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2024 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package scheduler - -import ( - "context" - "os" - "strings" - - "github.com/spf13/cobra" - "sigs.k8s.io/yaml" - - "github.com/arangodb/kube-arangodb/pkg/logging" - "github.com/arangodb/kube-arangodb/pkg/util" - "github.com/arangodb/kube-arangodb/pkg/util/constants" - "github.com/arangodb/kube-arangodb/pkg/util/errors" - "github.com/arangodb/kube-arangodb/pkg/util/kclient" -) - -func InitCommand(cmd *cobra.Command) error { - var c cli - return c.register(cmd) -} - -type cli struct { - Namespace string - - Labels []string - Envs []string - - Profiles []string - - Container string - - Image string -} - -func (c *cli) asRequest(args ...string) (Request, error) { - var r = Request{ - Labels: map[string]string{}, - Envs: map[string]string{}, - } - - for _, l := range c.Labels { - p := strings.SplitN(l, "=", 2) - if len(p) == 1 { - r.Labels[p[0]] = "" - logger.Debug("Label Discovered: %s", p[0]) - } else { - r.Labels[p[0]] = p[1] - logger.Debug("Label Discovered: %s=%s", p[0], p[1]) - } - } - - for _, l := range c.Envs { - p := strings.SplitN(l, "=", 2) - if len(p) == 1 { - return r, errors.Errorf("Missing value for env: %s", p[0]) - } else { - r.Envs[p[0]] = p[1] - logger.Debug("Env Discovered: %s=%s", p[0], p[1]) - } - } - - if len(c.Profiles) > 0 { - r.Profiles = c.Profiles - logger.Debug("Enabling profiles: %s", strings.Join(c.Profiles, ", ")) - } - - r.Container = util.NewType(c.Container) - if c.Image != "" { - r.Image = util.NewType(c.Image) - } - - r.Args = args - - return r, nil -} - -func (c *cli) register(cmd *cobra.Command) error { - if err := logging.Init(cmd); err != nil { - return err - } - - cmd.RunE = c.run - - f := cmd.PersistentFlags() - - f.StringVarP(&c.Namespace, "namespace", "n", constants.NamespaceWithDefault("default"), "Kubernetes namespace") - f.StringSliceVarP(&c.Labels, "label", "l", nil, "Scheduler Render Labels in format =") - f.StringSliceVarP(&c.Envs, "env", "e", nil, "Scheduler Render Envs in format =") - f.StringSliceVarP(&c.Profiles, "profile", "p", nil, "Scheduler Render Profiles") - f.StringVar(&c.Container, "container", DefaultContainerName, "Container Name") - f.StringVar(&c.Image, "image", "", "Image") - - return nil -} - -func (c *cli) run(cmd *cobra.Command, args []string) error { - if err := logging.Enable(); err != nil { - return err - } - - r, err := c.asRequest() - if err != nil { - return err - } - - k, ok := kclient.GetDefaultFactory().Client() - if !ok { - return errors.Errorf("Unable to create Kubernetes Client") - } - - s := NewScheduler(k, c.Namespace) - - rendered, profiles, err := s.Render(context.Background(), r) - if err != nil { - return err - } - logger.Debug("Enabled profiles: %s", strings.Join(profiles, ", ")) - - data, err := yaml.Marshal(rendered) - if err != nil { - return err - } - - if _, err := util.WriteAll(os.Stdout, data); err != nil { - return err - } - - return nil -} diff --git a/pkg/scheduler/input.go b/pkg/scheduler/input.go index 03f3c885c..6a31349a2 100644 --- a/pkg/scheduler/input.go +++ b/pkg/scheduler/input.go @@ -25,6 +25,7 @@ import ( core "k8s.io/api/core/v1" + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1" schedulerContainerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container" schedulerContainerResourcesApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container/resources" @@ -33,59 +34,55 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util" ) -const DefaultContainerName = "job" +func baseAsTemplate(in *pbSchedulerV1.Spec) *schedulerApi.ProfileTemplate { + containers := schedulerContainerApi.Containers{} -type Request struct { - Labels map[string]string - - Profiles []string - - Envs map[string]string - - Container *string - - Image *string - - Args []string -} - -func (r Request) AsTemplate() *schedulerApi.ProfileTemplate { - var container schedulerContainerApi.Container - - if len(r.Envs) > 0 { - container.Environments = &schedulerContainerResourcesApi.Environments{} - - for k, v := range r.Envs { - container.Environments.Env = append(container.Environments.Env, core.EnvVar{ - Name: k, - Value: v, - }) + for n, c := range in.Containers { + if c == nil { + continue } + + var container schedulerContainerApi.Container + + if image := c.Image; image != nil { + container.Image = &schedulerContainerResourcesApi.Image{ + Image: c.Image, + } + } + + if len(c.Args) > 0 { + container.Core = &schedulerContainerResourcesApi.Core{ + Args: c.Args, + } + } + + if len(c.EnvironmentVariables) > 0 { + container.Environments = &schedulerContainerResourcesApi.Environments{} + + for k, v := range c.EnvironmentVariables { + container.Env = append(container.Env, core.EnvVar{ + Name: k, + Value: v, + }) + } + } + + containers[n] = container } - if len(r.Args) > 0 { - container.Core = &schedulerContainerResourcesApi.Core{ - Args: r.Args, - } - } - - if r.Image != nil { - container.Image = &schedulerContainerResourcesApi.Image{ - Image: util.NewType(util.TypeOrDefault(r.Image)), - } - } - - return &schedulerApi.ProfileTemplate{ + var t = schedulerApi.ProfileTemplate{ Priority: util.NewType(math.MaxInt), - Pod: &schedulerPodApi.Pod{ - Metadata: &schedulerPodResourcesApi.Metadata{ - Labels: util.MergeMaps(true, r.Labels), - }, - }, + Pod: &schedulerPodApi.Pod{}, Container: &schedulerApi.ProfileContainerTemplate{ - Containers: map[string]schedulerContainerApi.Container{ - util.TypeOrDefault(r.Container, DefaultContainerName): container, - }, + Containers: containers, }, } + + if job := in.Job; job != nil { + t.Pod.Metadata = &schedulerPodResourcesApi.Metadata{ + Labels: job.Labels, + } + } + + return &t } diff --git a/pkg/scheduler/logger.go b/pkg/scheduler/logger.go deleted file mode 100644 index b470e9a2a..000000000 --- a/pkg/scheduler/logger.go +++ /dev/null @@ -1,25 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2024 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// - -package scheduler - -import "github.com/arangodb/kube-arangodb/pkg/logging" - -var logger = logging.Global().RegisterAndGetLogger("scheduler", logging.Info) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 2f53804b8..0affbb801 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -25,6 +25,7 @@ import ( core "k8s.io/api/core/v1" + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1" "github.com/arangodb/kube-arangodb/pkg/debug_package/generators/kubernetes" "github.com/arangodb/kube-arangodb/pkg/util/errors" @@ -39,7 +40,7 @@ func NewScheduler(client kclient.Client, namespace string) Scheduler { } type Scheduler interface { - Render(ctx context.Context, in Request, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error) + Render(ctx context.Context, in *pbSchedulerV1.Spec, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error) } type scheduler struct { @@ -47,7 +48,11 @@ type scheduler struct { namespace string } -func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error) { +func (s scheduler) Render(ctx context.Context, in *pbSchedulerV1.Spec, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error) { + if in == nil { + return nil, nil, errors.Errorf("Unable to parse nil Spec") + } + profileMap, err := kubernetes.MapObjects[*schedulerApi.ArangoProfileList, *schedulerApi.ArangoProfile](ctx, s.client.Arango().SchedulerV1alpha1().ArangoProfiles(s.namespace), func(result *schedulerApi.ArangoProfileList) []*schedulerApi.ArangoProfile { q := make([]*schedulerApi.ArangoProfile, len(result.Items)) @@ -62,6 +67,18 @@ func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedul return nil, nil, err } + var labels map[string]string + var additionalProfiles []string + + if job := in.Job; job != nil { + labels = job.Labels + additionalProfiles = job.Profiles + } + + if len(in.Containers) == 0 { + return nil, nil, errors.Errorf("Required at least 1 container") + } + profiles := profileMap.AsList().Filter(func(a *schedulerApi.ArangoProfile) bool { return a != nil && a.Spec.Template != nil }).Filter(func(a *schedulerApi.ArangoProfile) bool { @@ -69,14 +86,14 @@ func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedul return false } - if !a.Spec.Selectors.Select(in.Labels) { + if !a.Spec.Selectors.Select(labels) { return false } return true }) - for _, name := range in.Profiles { + for _, name := range additionalProfiles { p, ok := profileMap.ByName(name) if !ok { return nil, nil, errors.Errorf("Profile with name `%s` is missing", name) @@ -103,7 +120,7 @@ func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedul extracted := schedulerApi.ProfileTemplates(kubernetes.Extract(profiles, func(in *schedulerApi.ArangoProfile) *schedulerApi.ProfileTemplate { return in.Spec.Template - }).Append(templates...).Append(in.AsTemplate())) + }).Append(templates...).Append(baseAsTemplate(in))) names := kubernetes.Extract(profiles, func(in *schedulerApi.ArangoProfile) string { return in.GetName() diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index e41ba0254..149382eb4 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -30,6 +30,7 @@ import ( meta "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/yaml" + pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition" schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1" schedulerContainerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container" schedulerContainerResourcesApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container/resources" @@ -38,6 +39,8 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/tests" ) +const DefaultContainerName = "job" + func newScheduler(t *testing.T, objects ...*schedulerApi.ArangoProfile) Scheduler { client := kclient.NewFakeClientBuilder().Client() @@ -55,7 +58,58 @@ type validatorExec func(in validator) type validator func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) -func render(t *testing.T, s Scheduler, in Request, templates ...*schedulerApi.ProfileTemplate) validatorExec { +func getRequest(in ...func(obj *pbSchedulerV1.Spec)) *pbSchedulerV1.Spec { + var r pbSchedulerV1.Spec + for _, i := range in { + i(&r) + } + + return &r +} + +func withProfiles(profiles ...string) func(obj *pbSchedulerV1.Spec) { + return func(obj *pbSchedulerV1.Spec) { + if obj.Job == nil { + obj.Job = &pbSchedulerV1.JobBase{} + } + + obj.Job.Profiles = append(obj.Job.Profiles, profiles...) + } +} + +func withLabels(labels map[string]string) func(obj *pbSchedulerV1.Spec) { + return func(obj *pbSchedulerV1.Spec) { + if obj.Job == nil { + obj.Job = &pbSchedulerV1.JobBase{} + } + + if obj.Job.Labels == nil { + obj.Job.Labels = make(map[string]string) + } + + for k, v := range labels { + obj.Job.Labels[k] = v + } + } +} + +func withDefaultContainer(in ...func(obj *pbSchedulerV1.ContainerBase)) func(obj *pbSchedulerV1.Spec) { + return func(obj *pbSchedulerV1.Spec) { + if obj.Containers == nil { + obj.Containers = make(map[string]*pbSchedulerV1.ContainerBase) + } + + var c pbSchedulerV1.ContainerBase + + for _, i := range in { + i(&c) + } + + obj.Containers[DefaultContainerName] = &c + } +} + +func render(t *testing.T, s Scheduler, in *pbSchedulerV1.Spec, templates ...*schedulerApi.ProfileTemplate) validatorExec { pod, accepted, err := s.Render(context.Background(), in, templates...) t.Logf("Accepted templates: %s", strings.Join(accepted, ", ")) if err != nil { @@ -79,20 +133,22 @@ func runValidate(t *testing.T, err error, template *core.PodTemplateSpec, accept } } +func Test_Nil(t *testing.T) { + render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), nil)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + require.EqualError(t, err, "Unable to parse nil Spec") + }) +} + func Test_NoProfiles(t *testing.T) { - render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { - require.NoError(t, err) - - require.Len(t, accepted, 0) - - tests.GetContainerByNameT(t, template.Spec.Containers, DefaultContainerName) + render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), &pbSchedulerV1.Spec{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + require.EqualError(t, err, "Required at least 1 container") }) } func Test_MissingSelectedProfile(t *testing.T) { - render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), Request{ - Profiles: []string{"missing"}, - })(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), + getRequest(withProfiles("missing"), withDefaultContainer()), + )(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { require.EqualError(t, err, "Profile with name `missing` is missing") }) } @@ -110,7 +166,9 @@ func Test_SelectorWithoutSelector(t *testing.T) { }, }, } - })), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + })), getRequest(withDefaultContainer(func(obj *pbSchedulerV1.ContainerBase) { + obj.Image = util.NewType("") + })))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { require.NoError(t, err) require.Len(t, accepted, 0) @@ -136,7 +194,7 @@ func Test_SelectorWithSelectorAll(t *testing.T) { }, }, } - })), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + })), getRequest(withDefaultContainer()))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { require.NoError(t, err) require.Len(t, accepted, 1) @@ -172,7 +230,7 @@ func Test_SelectorWithSpecificSelector_MissingLabel(t *testing.T) { }, }, } - })), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + })), getRequest(withDefaultContainer()))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { require.NoError(t, err) require.Len(t, accepted, 0) @@ -205,11 +263,9 @@ func Test_SelectorWithSpecificSelector_PresentLabel(t *testing.T) { }, }, } - })), Request{ - Labels: map[string]string{ - "ml.arangodb.com/type": "training", - }, - }, nil)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + })), getRequest(withDefaultContainer(), withLabels(map[string]string{ + "ml.arangodb.com/type": "training", + })), nil)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { require.NoError(t, err) require.Len(t, accepted, 1) @@ -270,11 +326,10 @@ func Test_SelectorWithSpecificSelector_PresentLabel_ByPriority(t *testing.T) { }, }, } - })), Request{ - Labels: map[string]string{ - "ml.arangodb.com/type": "training", - }, - })(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { + })), getRequest(withDefaultContainer(), withLabels(map[string]string{ + "ml.arangodb.com/type": "training", + }, + )))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) { require.NoError(t, err) require.Len(t, accepted, 2) diff --git a/pkg/util/k8sutil/resources/access.go b/pkg/util/k8sutil/resources/access.go new file mode 100644 index 000000000..3a26c40bf --- /dev/null +++ b/pkg/util/k8sutil/resources/access.go @@ -0,0 +1,126 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package resources + +import ( + "context" + "fmt" + "strings" + "sync" + + authorization "k8s.io/api/authorization/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" +) + +type AccessRequest struct { + Verb, Group, Version, Resource, SubResource, Name, Namespace string +} + +func (a AccessRequest) Verify(ctx context.Context, client kubernetes.Interface) authorization.SubjectAccessReviewStatus { + return VerifyAccessRequestStatus(ctx, client, a.Verb, a.Group, a.Version, a.Resource, a.SubResource, a.Name, a.Namespace) +} + +func (a AccessRequest) VerifyErr(ctx context.Context, client kubernetes.Interface) error { + res := a.Verify(ctx, client) + if res.Allowed { + return nil + } + if res.Reason != "" { + return errors.Errorf("Unable to access %s: %s", a.String(), res.Reason) + } + + return errors.Errorf("Unable to access %s", a.String()) +} + +func (a AccessRequest) String() string { + gv := a.Version + if a.Group != "" { + gv = fmt.Sprintf("%s/%s", a.Group, a.Version) + } + + res := a.Resource + + if a.SubResource != "" { + res = fmt.Sprintf("%s/%s", a.Resource, a.SubResource) + } + + n := a.Name + + if a.Namespace != "" { + n = fmt.Sprintf("%s/%s", a.Namespace, a.Name) + } + + return fmt.Sprintf("%s/%s/%s %s", gv, res, n, strings.ToUpper(a.Verb)) +} + +func VerifyAll(ctx context.Context, client kubernetes.Interface, requests ...AccessRequest) error { + var wg sync.WaitGroup + + errs := make([]error, len(requests)) + + for id := range requests { + wg.Add(1) + + go func(id int) { + defer wg.Done() + + errs[id] = requests[id].VerifyErr(ctx, client) + }(id) + } + + wg.Wait() + + return errors.Errors(errs...) +} + +func VerifyAccessRequestStatus(ctx context.Context, client kubernetes.Interface, verb, group, version, resource, subResource, name, namespace string) authorization.SubjectAccessReviewStatus { + resp, err := VerifyAccessRequest(ctx, client, verb, group, version, resource, subResource, name, namespace) + + if err != nil { + return authorization.SubjectAccessReviewStatus{ + Allowed: false, + Reason: fmt.Sprintf("Unable to check access: %s", err.Error()), + } + } + + return resp.Status +} + +func VerifyAccessRequest(ctx context.Context, client kubernetes.Interface, verb, group, version, resource, subResource, name, namespace string) (*authorization.SelfSubjectAccessReview, error) { + review := authorization.SelfSubjectAccessReview{ + Spec: authorization.SelfSubjectAccessReviewSpec{ + ResourceAttributes: &authorization.ResourceAttributes{ + Namespace: namespace, + Verb: verb, + Group: group, + Version: version, + Resource: resource, + Subresource: subResource, + Name: name, + }, + }, + } + + return client.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, &review, meta.CreateOptions{}) +} diff --git a/pkg/util/refs.go b/pkg/util/refs.go index bb03b8708..c3234a8ab 100644 --- a/pkg/util/refs.go +++ b/pkg/util/refs.go @@ -63,6 +63,12 @@ func Default[T interface{}]() T { return d } +// DefaultInterface returns generic default value for type T as Interface +func DefaultInterface[T interface{}]() interface{} { + var d T + return d +} + // First returns first not nil value func First[T interface{}](input ...*T) *T { for _, i := range input { diff --git a/pkg/util/tests/kubernetes.go b/pkg/util/tests/kubernetes.go index eea972e4e..31a052a41 100644 --- a/pkg/util/tests/kubernetes.go +++ b/pkg/util/tests/kubernetes.go @@ -48,6 +48,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/operatorV2/operation" "github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors" + "github.com/arangodb/kube-arangodb/pkg/util/kclient" ) type handleFunc struct { @@ -223,6 +224,10 @@ func CreateObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe } } +func UpdateObjectsC(t *testing.T, client kclient.Client, objects ...interface{}) func(t *testing.T) { + return UpdateObjects(t, client.Kubernetes(), client.Arango(), objects...) +} + func UpdateObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) func(t *testing.T) { for _, object := range objects { switch v := object.(type) { @@ -349,7 +354,7 @@ func UpdateObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe } } -func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) func(t *testing.T) { +func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) { for _, object := range objects { switch v := object.(type) { case **batch.CronJob: @@ -372,11 +377,21 @@ func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe vl := *v require.NoError(t, k8s.CoreV1().Secrets(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{})) + case **core.Service: + require.NotNil(t, v) + + vl := *v + require.NoError(t, k8s.CoreV1().Services(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{})) case **core.ServiceAccount: require.NotNil(t, v) vl := *v require.NoError(t, k8s.CoreV1().ServiceAccounts(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{})) + case **apps.StatefulSet: + require.NotNil(t, v) + vl := *v + err := k8s.AppsV1().StatefulSets(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{}) + require.NoError(t, err) case **api.ArangoDeployment: require.NotNil(t, v) @@ -432,14 +447,19 @@ func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe vl := *v require.NoError(t, k8s.RbacV1().RoleBindings(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{})) + case **schedulerApi.ArangoProfile: + require.NotNil(t, v) + + vl := *v + require.NoError(t, arango.SchedulerV1alpha1().ArangoProfiles(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{})) default: - require.Fail(t, fmt.Sprintf("Unable to create object: %s", reflect.TypeOf(v).String())) + require.Fail(t, fmt.Sprintf("Unable to delete object: %s", reflect.TypeOf(v).String())) } } +} - return func(t *testing.T) { - RefreshObjects(t, k8s, arango, objects...) - } +func RefreshObjectsC(t *testing.T, client kclient.Client, objects ...interface{}) { + RefreshObjects(t, client.Kubernetes(), client.Arango(), objects...) } func RefreshObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) { diff --git a/pkg/util/tests/kubernetes_test.go b/pkg/util/tests/kubernetes_test.go index 421373982..f5c466d4f 100644 --- a/pkg/util/tests/kubernetes_test.go +++ b/pkg/util/tests/kubernetes_test.go @@ -57,6 +57,8 @@ func NewMetaObjectRun[T meta.Object](t *testing.T) { refresh(t) UpdateObjects(t, c.Kubernetes(), c.Arango(), &obj) + + DeleteObjects(t, c.Kubernetes(), c.Arango(), &obj) }) }) } diff --git a/pkg/util/tests/time.go b/pkg/util/tests/time.go index 330d07463..7b197807c 100644 --- a/pkg/util/tests/time.go +++ b/pkg/util/tests/time.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,6 +25,8 @@ import ( "time" "github.com/stretchr/testify/require" + + "github.com/arangodb/kube-arangodb/pkg/util/errors" ) func DurationBetween() func(t *testing.T, expected time.Duration, skew float64) { @@ -39,3 +41,48 @@ func DurationBetween() func(t *testing.T, expected time.Duration, skew float64) } } } + +func Interrupt() error { + return interrupt{} +} + +type interrupt struct { +} + +func (i interrupt) Error() string { + return "interrupt" +} + +func NewTimeout(in Timeout) Timeout { + return in +} + +type Timeout func() error + +func (t Timeout) WithTimeout(timeout, interval time.Duration) error { + timeoutT := time.NewTimer(timeout) + defer timeoutT.Stop() + + intervalT := time.NewTicker(interval) + defer intervalT.Stop() + + for { + select { + case <-timeoutT.C: + return errors.Errorf("Timeouted!") + case <-intervalT.C: + if err := t(); err != nil { + var interrupt interrupt + if errors.As(err, &interrupt) { + return nil + } + + return err + } + } + } +} + +func (t Timeout) WithTimeoutT(z *testing.T, timeout, interval time.Duration) { + require.NoError(z, t.WithTimeout(timeout, interval)) +}