1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Scheduler BatchJob Integration Service (#1633)

This commit is contained in:
Adam Janikowski 2024-04-01 12:12:08 +02:00 committed by GitHub
parent 0a37f52b8d
commit e10ac2662c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1480 additions and 324 deletions

View file

@ -16,6 +16,7 @@
- (Feature) Parametrize ForceDelete timeout
- (Feature) Scheduler BatchJob Integration Definition
- (Feature) Scheduler CronJob Integration Definition
- (Feature) Scheduler BatchJob Integration Service
## [1.2.39](https://github.com/arangodb/kube-arangodb/tree/1.2.39) (2024-03-11)
- (Feature) Extract Scheduler API

View file

@ -0,0 +1,204 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"context"
"testing"
"github.com/stretchr/testify/require"
batch "k8s.io/api/batch/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/util/tests"
)
func Test_BatchJob(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
client := kclient.NewFakeClientBuilder().Add(
tests.NewMetaObject(t, tests.FakeNamespace, "test", func(t *testing.T, obj *schedulerApi.ArangoProfile) {
obj.Spec = schedulerApi.ProfileSpec{}
}),
tests.NewMetaObject(t, tests.FakeNamespace, "test-select-all", func(t *testing.T, obj *schedulerApi.ArangoProfile) {
obj.Spec = schedulerApi.ProfileSpec{
Selectors: &schedulerApi.ProfileSelectors{
Label: &meta.LabelSelector{},
},
Template: &schedulerApi.ProfileTemplate{},
}
}),
tests.NewMetaObject(t, tests.FakeNamespace, "test-select-specific", func(t *testing.T, obj *schedulerApi.ArangoProfile) {
obj.Spec = schedulerApi.ProfileSpec{
Selectors: &schedulerApi.ProfileSelectors{
Label: &meta.LabelSelector{
MatchLabels: map[string]string{
"A": "B",
},
},
},
Template: &schedulerApi.ProfileTemplate{},
}
}),
).Client()
scheduler := Client(t, ctx, client, func(c Configuration) Configuration {
c.Namespace = tests.FakeNamespace
c.VerifyAccess = false
return c
})
t.Run("Ensure job does not exist - get", func(t *testing.T) {
resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.False(t, resp.GetExists())
})
t.Run("Ensure job does not exist - list", func(t *testing.T) {
resp, err := scheduler.ListBatchJob(context.Background(), &pbSchedulerV1.ListBatchJobRequest{})
require.NoError(t, err)
require.Len(t, resp.GetBatchJobs(), 0)
})
t.Run("Schedule Job", func(t *testing.T) {
resp, err := scheduler.CreateBatchJob(context.Background(), &pbSchedulerV1.CreateBatchJobRequest{
Spec: &pbSchedulerV1.Spec{
Metadata: &pbSchedulerV1.Metadata{
Name: "test",
},
Job: &pbSchedulerV1.JobBase{
Labels: nil,
Profiles: []string{
"test",
},
},
Containers: map[string]*pbSchedulerV1.ContainerBase{
"example": {
Image: util.NewType("ubuntu:20.04"),
Args: []string{
"/bin/bash",
"-c",
"true",
},
},
},
},
BatchJob: &pbSchedulerV1.BatchJobSpec{
Completions: util.NewType[int32](1),
},
})
require.NoError(t, err)
require.EqualValues(t, "test", resp.GetName())
require.Len(t, resp.Profiles, 2)
require.Contains(t, resp.Profiles, "test")
require.Contains(t, resp.Profiles, "test-select-all")
require.NotContains(t, resp.Profiles, "test-select-specific")
})
t.Run("Ensure job exist - get", func(t *testing.T) {
resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
})
t.Run("Ensure job exist - list", func(t *testing.T) {
resp, err := scheduler.ListBatchJob(context.Background(), &pbSchedulerV1.ListBatchJobRequest{})
require.NoError(t, err)
require.Len(t, resp.GetBatchJobs(), 1)
require.Contains(t, resp.GetBatchJobs(), "test")
})
t.Run("Ensure job details - pre", func(t *testing.T) {
resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.EqualValues(t, 0, resp.GetBatchJob().GetStatus().GetSucceeded())
})
t.Run("Ensure job details - update", func(t *testing.T) {
job := tests.NewMetaObject[*batch.Job](t, tests.FakeNamespace, "test")
tests.RefreshObjectsC(t, client, &job)
job.Status.Succeeded = 1
tests.UpdateObjectsC(t, client, &job)
})
t.Run("Ensure job details - post", func(t *testing.T) {
resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.EqualValues(t, 1, resp.GetBatchJob().GetStatus().GetSucceeded())
})
t.Run("Delete Job", func(t *testing.T) {
resp, err := scheduler.DeleteBatchJob(context.Background(), &pbSchedulerV1.DeleteBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
})
t.Run("Re-Delete Job", func(t *testing.T) {
resp, err := scheduler.DeleteBatchJob(context.Background(), &pbSchedulerV1.DeleteBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.False(t, resp.GetExists())
})
t.Run("Ensure job does not exist after deletion - get", func(t *testing.T) {
resp, err := scheduler.GetBatchJob(context.Background(), &pbSchedulerV1.GetBatchJobRequest{
Name: "test",
})
require.NoError(t, err)
require.False(t, resp.GetExists())
})
t.Run("Ensure job does not exist after deletion - list", func(t *testing.T) {
resp, err := scheduler.ListBatchJob(context.Background(), &pbSchedulerV1.ListBatchJobRequest{})
require.NoError(t, err)
require.Len(t, resp.GetBatchJobs(), 0)
})
}

View file

@ -18,22 +18,29 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package cmd
package v1
import (
"github.com/spf13/cobra"
type Mod func(c Configuration) Configuration
"github.com/arangodb/kube-arangodb/pkg/scheduler"
)
func init() {
cmd := &cobra.Command{
Use: "scheduler",
func NewConfiguration() Configuration {
return Configuration{
Namespace: "default",
VerifyAccess: true,
}
if err := scheduler.InitCommand(cmd); err != nil {
panic(err.Error())
}
cmdMain.AddCommand(cmd)
}
type Configuration struct {
Namespace string
VerifyAccess bool
}
func (c Configuration) With(mods ...Mod) Configuration {
n := c
for _, mod := range mods {
n = mod(n)
}
return n
}

View file

@ -0,0 +1,250 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"context"
"testing"
"github.com/stretchr/testify/require"
batch "k8s.io/api/batch/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/util/tests"
)
func Test_CronJob(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
client := kclient.NewFakeClientBuilder().Add(
tests.NewMetaObject(t, tests.FakeNamespace, "test", func(t *testing.T, obj *schedulerApi.ArangoProfile) {
obj.Spec = schedulerApi.ProfileSpec{}
}),
tests.NewMetaObject(t, tests.FakeNamespace, "test-select-all", func(t *testing.T, obj *schedulerApi.ArangoProfile) {
obj.Spec = schedulerApi.ProfileSpec{
Selectors: &schedulerApi.ProfileSelectors{
Label: &meta.LabelSelector{},
},
Template: &schedulerApi.ProfileTemplate{},
}
}),
tests.NewMetaObject(t, tests.FakeNamespace, "test-select-specific", func(t *testing.T, obj *schedulerApi.ArangoProfile) {
obj.Spec = schedulerApi.ProfileSpec{
Selectors: &schedulerApi.ProfileSelectors{
Label: &meta.LabelSelector{
MatchLabels: map[string]string{
"A": "B",
},
},
},
Template: &schedulerApi.ProfileTemplate{},
}
}),
).Client()
scheduler := Client(t, ctx, client, func(c Configuration) Configuration {
c.Namespace = tests.FakeNamespace
c.VerifyAccess = false
return c
})
t.Run("Ensure job does not exist - get", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.False(t, resp.GetExists())
})
t.Run("Ensure job does not exist - list", func(t *testing.T) {
resp, err := scheduler.ListCronJob(context.Background(), &pbSchedulerV1.ListCronJobRequest{})
require.NoError(t, err)
require.Len(t, resp.GetCronJobs(), 0)
})
t.Run("Schedule Job", func(t *testing.T) {
resp, err := scheduler.CreateCronJob(context.Background(), &pbSchedulerV1.CreateCronJobRequest{
Spec: &pbSchedulerV1.Spec{
Metadata: &pbSchedulerV1.Metadata{
Name: "test",
},
Job: &pbSchedulerV1.JobBase{
Labels: nil,
Profiles: []string{
"test",
},
},
Containers: map[string]*pbSchedulerV1.ContainerBase{
"example": {
Image: util.NewType("ubuntu:20.04"),
Args: []string{
"/bin/bash",
"-c",
"true",
},
},
},
},
CronJob: &pbSchedulerV1.CronJobSpec{
Schedule: "* * * * *",
Job: &pbSchedulerV1.BatchJobSpec{
Parallelism: util.NewType[int32](1),
Completions: util.NewType[int32](1),
BackoffLimit: util.NewType[int32](1),
},
},
})
require.NoError(t, err)
require.EqualValues(t, "test", resp.GetName())
require.Len(t, resp.Profiles, 2)
require.Contains(t, resp.Profiles, "test")
require.Contains(t, resp.Profiles, "test-select-all")
require.NotContains(t, resp.Profiles, "test-select-specific")
})
t.Run("Ensure job exist - get", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
})
t.Run("Ensure job exist - list", func(t *testing.T) {
resp, err := scheduler.ListCronJob(context.Background(), &pbSchedulerV1.ListCronJobRequest{})
require.NoError(t, err)
require.Len(t, resp.GetCronJobs(), 1)
require.Contains(t, resp.GetCronJobs(), "test")
})
t.Run("Ensure job details - pre", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.Len(t, resp.GetBatchJobs(), 0)
})
t.Run("Ensure job details - update", func(t *testing.T) {
job := tests.NewMetaObject[*batch.CronJob](t, tests.FakeNamespace, "test")
tests.RefreshObjectsC(t, client, &job)
job.Status.Active = []core.ObjectReference{
{
Name: "test-job",
},
}
tests.UpdateObjectsC(t, client, &job)
})
t.Run("Ensure job details - post", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.Len(t, resp.GetBatchJobs(), 1)
})
t.Run("Update Job - Pre", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.EqualValues(t, "* * * * *", resp.GetCronJob().GetSpec().GetSchedule())
require.EqualValues(t, 1, resp.GetCronJob().GetSpec().GetJob().GetCompletions())
})
t.Run("Update Job", func(t *testing.T) {
resp, err := scheduler.UpdateCronJob(context.Background(), &pbSchedulerV1.UpdateCronJobRequest{
Name: "test",
Spec: &pbSchedulerV1.CronJobSpec{
Schedule: "1 * * * *",
Job: &pbSchedulerV1.BatchJobSpec{
Completions: util.NewType[int32](5),
},
},
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.EqualValues(t, "1 * * * *", resp.GetCronJob().GetSpec().GetSchedule())
require.EqualValues(t, 5, resp.GetCronJob().GetSpec().GetJob().GetCompletions())
})
t.Run("Update Job - Post", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
require.EqualValues(t, "1 * * * *", resp.GetCronJob().GetSpec().GetSchedule())
require.EqualValues(t, 5, resp.GetCronJob().GetSpec().GetJob().GetCompletions())
})
t.Run("Delete Job", func(t *testing.T) {
resp, err := scheduler.DeleteCronJob(context.Background(), &pbSchedulerV1.DeleteCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.True(t, resp.GetExists())
})
t.Run("Re-Delete Job", func(t *testing.T) {
resp, err := scheduler.DeleteCronJob(context.Background(), &pbSchedulerV1.DeleteCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.False(t, resp.GetExists())
})
t.Run("Ensure job does not exist after deletion - get", func(t *testing.T) {
resp, err := scheduler.GetCronJob(context.Background(), &pbSchedulerV1.GetCronJobRequest{
Name: "test",
})
require.NoError(t, err)
require.False(t, resp.GetExists())
})
t.Run("Ensure job does not exist after deletion - list", func(t *testing.T) {
resp, err := scheduler.ListCronJob(context.Background(), &pbSchedulerV1.ListCronJobRequest{})
require.NoError(t, err)
require.Len(t, resp.GetCronJobs(), 0)
})
}

View file

@ -46,10 +46,8 @@ type CronJob struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Keeps BatchJob settings
Job *BatchJobSpec `protobuf:"bytes,1,opt,name=job,proto3" json:"job,omitempty"`
// Keeps the CronJob Settings
Spec *CronJobSpec `protobuf:"bytes,2,opt,name=spec,proto3" json:"spec,omitempty"`
Spec *CronJobSpec `protobuf:"bytes,1,opt,name=spec,proto3" json:"spec,omitempty"`
}
func (x *CronJob) Reset() {
@ -84,13 +82,6 @@ func (*CronJob) Descriptor() ([]byte, []int) {
return file_integrations_scheduler_v1_definition_cronjob_proto_rawDescGZIP(), []int{0}
}
func (x *CronJob) GetJob() *BatchJobSpec {
if x != nil {
return x.Job
}
return nil
}
func (x *CronJob) GetSpec() *CronJobSpec {
if x != nil {
return x.Spec
@ -106,6 +97,8 @@ type CronJobSpec struct {
// Schedule definition
Schedule string `protobuf:"bytes,1,opt,name=schedule,proto3" json:"schedule,omitempty"`
// Keeps BatchJob settings
Job *BatchJobSpec `protobuf:"bytes,2,opt,name=job,proto3" json:"job,omitempty"`
}
func (x *CronJobSpec) Reset() {
@ -147,6 +140,13 @@ func (x *CronJobSpec) GetSchedule() string {
return ""
}
func (x *CronJobSpec) GetJob() *BatchJobSpec {
if x != nil {
return x.Job
}
return nil
}
var File_integrations_scheduler_v1_definition_cronjob_proto protoreflect.FileDescriptor
var file_integrations_scheduler_v1_definition_cronjob_proto_rawDesc = []byte{
@ -157,16 +157,16 @@ var file_integrations_scheduler_v1_definition_cronjob_proto_rawDesc = []byte{
0x33, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73, 0x63,
0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e,
0x69, 0x74, 0x69, 0x6f, 0x6e, 0x2f, 0x62, 0x61, 0x74, 0x63, 0x68, 0x6a, 0x6f, 0x62, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x60, 0x0a, 0x07, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12,
0x29, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x73,
0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x42, 0x61, 0x74, 0x63, 0x68, 0x4a, 0x6f,
0x62, 0x53, 0x70, 0x65, 0x63, 0x52, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x2a, 0x0a, 0x04, 0x73, 0x70,
0x65, 0x63, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x64,
0x75, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x70, 0x65, 0x63,
0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x22, 0x29, 0x0a, 0x0b, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f,
0x62, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c,
0x65, 0x42, 0x48, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x72, 0x6f, 0x74, 0x6f, 0x22, 0x35, 0x0a, 0x07, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x12,
0x2a, 0x0a, 0x04, 0x73, 0x70, 0x65, 0x63, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x16, 0x2e,
0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x43, 0x72, 0x6f, 0x6e, 0x4a, 0x6f,
0x62, 0x53, 0x70, 0x65, 0x63, 0x52, 0x04, 0x73, 0x70, 0x65, 0x63, 0x22, 0x54, 0x0a, 0x0b, 0x43,
0x72, 0x6f, 0x6e, 0x4a, 0x6f, 0x62, 0x53, 0x70, 0x65, 0x63, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x63,
0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x73, 0x63,
0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x12, 0x29, 0x0a, 0x03, 0x6a, 0x6f, 0x62, 0x18, 0x02, 0x20,
0x01, 0x28, 0x0b, 0x32, 0x17, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e,
0x42, 0x61, 0x74, 0x63, 0x68, 0x4a, 0x6f, 0x62, 0x53, 0x70, 0x65, 0x63, 0x52, 0x03, 0x6a, 0x6f,
0x62, 0x42, 0x48, 0x5a, 0x46, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f,
0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72,
0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x2f, 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2f, 0x76, 0x31,
@ -193,8 +193,8 @@ var file_integrations_scheduler_v1_definition_cronjob_proto_goTypes = []interfac
(*BatchJobSpec)(nil), // 2: scheduler.BatchJobSpec
}
var file_integrations_scheduler_v1_definition_cronjob_proto_depIdxs = []int32{
2, // 0: scheduler.CronJob.job:type_name -> scheduler.BatchJobSpec
1, // 1: scheduler.CronJob.spec:type_name -> scheduler.CronJobSpec
1, // 0: scheduler.CronJob.spec:type_name -> scheduler.CronJobSpec
2, // 1: scheduler.CronJobSpec.job:type_name -> scheduler.BatchJobSpec
2, // [2:2] is the sub-list for method output_type
2, // [2:2] is the sub-list for method input_type
2, // [2:2] is the sub-list for extension type_name

View file

@ -28,15 +28,15 @@ option go_package = "github.com/arangodb/kube-arangodb/integrations/scheduler/v1
// Keeps information about Kubernetes Batch/V1 CronJob
message CronJob {
// Keeps BatchJob settings
BatchJobSpec job = 1;
// Keeps the CronJob Settings
CronJobSpec spec = 2;
CronJobSpec spec = 1;
}
// Information about CronJob run settings
message CronJobSpec {
// Schedule definition
string schedule = 1;
// Keeps BatchJob settings
BatchJobSpec job = 2;
}

View file

@ -0,0 +1,504 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"context"
"google.golang.org/grpc"
batch "k8s.io/api/batch/v1"
core "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
"github.com/arangodb/kube-arangodb/pkg/debug_package/generators/kubernetes"
"github.com/arangodb/kube-arangodb/pkg/scheduler"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
kresources "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/resources"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
)
var _ pbSchedulerV1.SchedulerV1Server = &implementation{}
var _ svc.Handler = &implementation{}
func New(ctx context.Context, client kclient.Client, cfg Configuration) (svc.Handler, error) {
return newInternal(ctx, client, cfg)
}
func newInternal(ctx context.Context, client kclient.Client, cfg Configuration) (*implementation, error) {
if cfg.VerifyAccess {
// Lets Verify Access
if err := kresources.VerifyAll(ctx, client.Kubernetes(),
kresources.AccessRequest{
Verb: "create",
Group: "batch",
Version: "v1",
Resource: "jobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "list",
Group: "batch",
Version: "v1",
Resource: "jobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "delete",
Group: "batch",
Version: "v1",
Resource: "jobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "get",
Group: "batch",
Version: "v1",
Resource: "jobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "create",
Group: "batch",
Version: "v1",
Resource: "cronjobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "list",
Group: "batch",
Version: "v1",
Resource: "cronjobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "delete",
Group: "batch",
Version: "v1",
Resource: "cronjobs",
Namespace: cfg.Namespace,
},
kresources.AccessRequest{
Verb: "get",
Group: "batch",
Version: "v1",
Resource: "cronjobs",
Namespace: cfg.Namespace,
},
); err != nil {
return nil, errors.WithMessagef(err, "Unable to access API")
}
}
return &implementation{
cfg: cfg,
client: client,
scheduler: scheduler.NewScheduler(client, cfg.Namespace),
}, nil
}
type implementation struct {
cfg Configuration
client kclient.Client
scheduler scheduler.Scheduler
pbSchedulerV1.UnimplementedSchedulerV1Server
}
func (i *implementation) Name() string {
return pbSchedulerV1.Name
}
func (i *implementation) Register(registrar *grpc.Server) {
pbSchedulerV1.RegisterSchedulerV1Server(registrar, i)
}
func (i *implementation) Health() svc.HealthState {
return svc.Healthy
}
func (i *implementation) CreateBatchJob(ctx context.Context, request *pbSchedulerV1.CreateBatchJobRequest) (*pbSchedulerV1.CreateBatchJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
rendered, profiles, err := i.scheduler.Render(ctx, request.GetSpec())
if err != nil {
return nil, err
}
rendered.Spec.RestartPolicy = core.RestartPolicyNever
var spec batch.Job
spec.Namespace = i.cfg.Namespace
if meta := request.GetSpec().GetMetadata(); meta != nil {
if util.TypeOrDefault(meta.GenerateName, false) {
spec.GenerateName = meta.Name
} else {
spec.Name = meta.Name
}
}
spec.Spec.Template = *rendered
if batchJob := request.GetBatchJob(); batchJob != nil {
if v := batchJob.Completions; v != nil {
spec.Spec.Completions = v
}
if v := batchJob.Parallelism; v != nil {
spec.Spec.Parallelism = v
}
if v := batchJob.BackoffLimit; v != nil {
spec.Spec.BackoffLimit = v
}
}
if batchJobSpec := request.GetSpec(); batchJobSpec != nil {
if job := batchJobSpec.Job; job != nil {
spec.Labels = job.Labels
}
}
job, err := i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace).Create(ctx, &spec, meta.CreateOptions{})
if err != nil {
return nil, err
}
return &pbSchedulerV1.CreateBatchJobResponse{
Name: job.Name,
Profiles: profiles,
}, nil
}
func (i *implementation) GetBatchJob(ctx context.Context, request *pbSchedulerV1.GetBatchJobRequest) (*pbSchedulerV1.GetBatchJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
job, err := i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace).Get(ctx, request.GetName(), meta.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return &pbSchedulerV1.GetBatchJobResponse{
Exists: false,
}, nil
}
return nil, err
}
return &pbSchedulerV1.GetBatchJobResponse{
Exists: true,
BatchJob: &pbSchedulerV1.BatchJob{
Spec: &pbSchedulerV1.BatchJobSpec{
Parallelism: job.Spec.Parallelism,
Completions: job.Spec.Completions,
BackoffLimit: job.Spec.BackoffLimit,
},
Status: &pbSchedulerV1.BatchJobStatus{
Active: job.Status.Active,
Succeeded: job.Status.Succeeded,
Failed: job.Status.Failed,
},
},
}, nil
}
func (i *implementation) DeleteBatchJob(ctx context.Context, request *pbSchedulerV1.DeleteBatchJobRequest) (*pbSchedulerV1.DeleteBatchJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
var d meta.DeleteOptions
if v := request.DeleteChildPods; v != nil {
if *v {
d.PropagationPolicy = util.NewType(meta.DeletePropagationBackground)
} else {
d.PropagationPolicy = util.NewType(meta.DeletePropagationOrphan)
}
}
err := i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace).Delete(ctx, request.GetName(), d)
if err != nil {
if kerrors.IsNotFound(err) {
return &pbSchedulerV1.DeleteBatchJobResponse{
Exists: false,
}, nil
}
return nil, err
}
return &pbSchedulerV1.DeleteBatchJobResponse{Exists: true}, nil
}
func (i *implementation) ListBatchJob(ctx context.Context, request *pbSchedulerV1.ListBatchJobRequest) (*pbSchedulerV1.ListBatchJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
objects, err := kubernetes.ListObjects[*batch.JobList, *batch.Job](ctx, i.client.Kubernetes().BatchV1().Jobs(i.cfg.Namespace), func(result *batch.JobList) []*batch.Job {
r := make([]*batch.Job, len(result.Items))
for id := range result.Items {
r[id] = result.Items[id].DeepCopy()
}
return r
})
if err != nil {
return nil, err
}
return &pbSchedulerV1.ListBatchJobResponse{
BatchJobs: kubernetes.Extract(objects, func(in *batch.Job) string {
return in.GetName()
}),
}, nil
}
func (i *implementation) CreateCronJob(ctx context.Context, request *pbSchedulerV1.CreateCronJobRequest) (*pbSchedulerV1.CreateCronJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
rendered, profiles, err := i.scheduler.Render(ctx, request.GetSpec())
if err != nil {
return nil, err
}
rendered.Spec.RestartPolicy = core.RestartPolicyNever
var spec batch.CronJob
spec.Namespace = i.cfg.Namespace
if meta := request.GetSpec().GetMetadata(); meta != nil {
if util.TypeOrDefault(meta.GenerateName, false) {
spec.GenerateName = meta.Name
} else {
spec.Name = meta.Name
}
}
spec.Spec.JobTemplate.Spec.Template = *rendered
if cronJob := request.GetCronJob(); cronJob != nil {
spec.Spec.Schedule = cronJob.Schedule
if batchJob := cronJob.GetJob(); batchJob != nil {
if v := batchJob.Completions; v != nil {
spec.Spec.JobTemplate.Spec.Completions = v
}
if v := batchJob.Parallelism; v != nil {
spec.Spec.JobTemplate.Spec.Parallelism = v
}
if v := batchJob.BackoffLimit; v != nil {
spec.Spec.JobTemplate.Spec.BackoffLimit = v
}
}
}
if batchJobSpec := request.GetSpec(); batchJobSpec != nil {
if job := batchJobSpec.Job; job != nil {
spec.Labels = job.Labels
spec.Spec.JobTemplate.Labels = job.Labels
}
}
job, err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Create(ctx, &spec, meta.CreateOptions{})
if err != nil {
return nil, err
}
return &pbSchedulerV1.CreateCronJobResponse{
Name: job.Name,
Profiles: profiles,
}, nil
}
func (i *implementation) GetCronJob(ctx context.Context, request *pbSchedulerV1.GetCronJobRequest) (*pbSchedulerV1.GetCronJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
job, err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Get(ctx, request.GetName(), meta.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return &pbSchedulerV1.GetCronJobResponse{
Exists: false,
}, nil
}
return nil, err
}
return &pbSchedulerV1.GetCronJobResponse{
Exists: true,
CronJob: &pbSchedulerV1.CronJob{
Spec: &pbSchedulerV1.CronJobSpec{
Schedule: job.Spec.Schedule,
Job: &pbSchedulerV1.BatchJobSpec{
Parallelism: job.Spec.JobTemplate.Spec.Parallelism,
Completions: job.Spec.JobTemplate.Spec.Completions,
BackoffLimit: job.Spec.JobTemplate.Spec.BackoffLimit,
},
},
},
BatchJobs: kubernetes.Extract(job.Status.Active, func(in core.ObjectReference) string {
return in.Name
}),
}, nil
}
func (i *implementation) UpdateCronJob(ctx context.Context, request *pbSchedulerV1.UpdateCronJobRequest) (*pbSchedulerV1.UpdateCronJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
job, err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Get(ctx, request.GetName(), meta.GetOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return &pbSchedulerV1.UpdateCronJobResponse{
Exists: false,
}, nil
}
return nil, err
}
if cronJob := request.GetSpec(); cronJob != nil {
job.Spec.Schedule = cronJob.Schedule
if batchJob := cronJob.GetJob(); batchJob != nil {
if v := batchJob.Completions; v != nil {
job.Spec.JobTemplate.Spec.Completions = v
}
if v := batchJob.Parallelism; v != nil {
job.Spec.JobTemplate.Spec.Parallelism = v
}
if v := batchJob.BackoffLimit; v != nil {
job.Spec.JobTemplate.Spec.BackoffLimit = v
}
}
}
job, err = i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Update(ctx, job, meta.UpdateOptions{})
if err != nil {
if kerrors.IsNotFound(err) {
return &pbSchedulerV1.UpdateCronJobResponse{
Exists: false,
}, nil
}
return nil, err
}
return &pbSchedulerV1.UpdateCronJobResponse{
Exists: true,
CronJob: &pbSchedulerV1.CronJob{
Spec: &pbSchedulerV1.CronJobSpec{
Schedule: job.Spec.Schedule,
Job: &pbSchedulerV1.BatchJobSpec{
Parallelism: job.Spec.JobTemplate.Spec.Parallelism,
Completions: job.Spec.JobTemplate.Spec.Completions,
BackoffLimit: job.Spec.JobTemplate.Spec.BackoffLimit,
},
},
},
}, nil
}
func (i *implementation) ListCronJob(ctx context.Context, request *pbSchedulerV1.ListCronJobRequest) (*pbSchedulerV1.ListCronJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
objects, err := kubernetes.ListObjects[*batch.CronJobList, *batch.CronJob](ctx, i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace), func(result *batch.CronJobList) []*batch.CronJob {
r := make([]*batch.CronJob, len(result.Items))
for id := range result.Items {
r[id] = result.Items[id].DeepCopy()
}
return r
})
if err != nil {
return nil, err
}
return &pbSchedulerV1.ListCronJobResponse{
CronJobs: kubernetes.Extract(objects, func(in *batch.CronJob) string {
return in.GetName()
}),
}, nil
}
func (i *implementation) DeleteCronJob(ctx context.Context, request *pbSchedulerV1.DeleteCronJobRequest) (*pbSchedulerV1.DeleteCronJobResponse, error) {
if request == nil {
return nil, errors.Errorf("Request is nil")
}
var d meta.DeleteOptions
if v := request.DeleteChildPods; v != nil {
if *v {
d.PropagationPolicy = util.NewType(meta.DeletePropagationBackground)
} else {
d.PropagationPolicy = util.NewType(meta.DeletePropagationOrphan)
}
}
err := i.client.Kubernetes().BatchV1().CronJobs(i.cfg.Namespace).Delete(ctx, request.GetName(), d)
if err != nil {
if kerrors.IsNotFound(err) {
return &pbSchedulerV1.DeleteCronJobResponse{
Exists: false,
}, nil
}
return nil, err
}
return &pbSchedulerV1.DeleteCronJobResponse{Exists: true}, nil
}

View file

@ -0,0 +1,50 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v1
import (
"context"
"testing"
"github.com/stretchr/testify/require"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
"github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc"
)
func Handler(t *testing.T, ctx context.Context, client kclient.Client, mods ...Mod) svc.Handler {
handler, err := New(ctx, client, NewConfiguration().With(mods...))
require.NoError(t, err)
return handler
}
func Client(t *testing.T, ctx context.Context, client kclient.Client, mods ...Mod) pbSchedulerV1.SchedulerV1Client {
local := svc.NewService(svc.Configuration{
Address: "127.0.0.1:0",
}, Handler(t, ctx, client, mods...))
start := local.Start(ctx)
return tgrpc.NewGRPCClient(t, ctx, pbSchedulerV1.NewSchedulerV1Client, start.Address())
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -22,7 +22,6 @@ package crd
import (
"context"
"fmt"
authorization "k8s.io/api/authorization/v1"
apiextensions "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
@ -33,6 +32,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/crd/crds"
"github.com/arangodb/kube-arangodb/pkg/logging"
kresources "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/resources"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
@ -150,31 +150,7 @@ func verifyCRDAccess(ctx context.Context, client kclient.Client, crd string, ver
return *c
}
r, err := verifyCRDAccessRequest(ctx, client, crd, verb)
if err != nil {
return authorization.SubjectAccessReviewStatus{
Allowed: false,
Reason: fmt.Sprintf("Unable to check access: %s", err.Error()),
}
}
return r.Status
return kresources.VerifyAccessRequestStatus(ctx, client.Kubernetes(), verb, "apiextensions.k8s.io", "v1", "customresourcedefinitions", "", crd, "")
}
var verifyCRDAccessForTests *authorization.SubjectAccessReviewStatus
func verifyCRDAccessRequest(ctx context.Context, client kclient.Client, crd string, verb string) (*authorization.SelfSubjectAccessReview, error) {
review := authorization.SelfSubjectAccessReview{
Spec: authorization.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorization.ResourceAttributes{
Verb: verb,
Group: "apiextensions.k8s.io",
Version: "v1",
Resource: "customresourcedefinitions",
Name: crd,
},
},
}
return client.Kubernetes().AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, &review, meta.CreateOptions{})
}

View file

@ -0,0 +1,69 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package integrations
import (
"context"
"github.com/spf13/cobra"
pbImplSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
)
func init() {
register(func() Integration {
return &schedulerV1{}
})
}
type schedulerV1 struct {
Configuration pbImplSchedulerV1.Configuration
}
func (b *schedulerV1) Name() string {
return "scheduler.v1"
}
func (b *schedulerV1) Description() string {
return "SchedulerV1 Integration"
}
func (b *schedulerV1) Register(cmd *cobra.Command, arg ArgGen) error {
f := cmd.Flags()
f.StringVar(&b.Configuration.Namespace, arg("namespace"), constants.NamespaceWithDefault("default"), "Kubernetes Namespace")
f.BoolVar(&b.Configuration.VerifyAccess, arg("verify-access"), true, "Verify the CRD Access")
return nil
}
func (b *schedulerV1) Handler(ctx context.Context) (svc.Handler, error) {
client, ok := kclient.GetDefaultFactory().Client()
if !ok {
return nil, errors.Errorf("Unable to create Kubernetes Client")
}
return pbImplSchedulerV1.New(ctx, client, b.Configuration)
}

View file

@ -1,150 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package scheduler
import (
"context"
"os"
"strings"
"github.com/spf13/cobra"
"sigs.k8s.io/yaml"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
func InitCommand(cmd *cobra.Command) error {
var c cli
return c.register(cmd)
}
type cli struct {
Namespace string
Labels []string
Envs []string
Profiles []string
Container string
Image string
}
func (c *cli) asRequest(args ...string) (Request, error) {
var r = Request{
Labels: map[string]string{},
Envs: map[string]string{},
}
for _, l := range c.Labels {
p := strings.SplitN(l, "=", 2)
if len(p) == 1 {
r.Labels[p[0]] = ""
logger.Debug("Label Discovered: %s", p[0])
} else {
r.Labels[p[0]] = p[1]
logger.Debug("Label Discovered: %s=%s", p[0], p[1])
}
}
for _, l := range c.Envs {
p := strings.SplitN(l, "=", 2)
if len(p) == 1 {
return r, errors.Errorf("Missing value for env: %s", p[0])
} else {
r.Envs[p[0]] = p[1]
logger.Debug("Env Discovered: %s=%s", p[0], p[1])
}
}
if len(c.Profiles) > 0 {
r.Profiles = c.Profiles
logger.Debug("Enabling profiles: %s", strings.Join(c.Profiles, ", "))
}
r.Container = util.NewType(c.Container)
if c.Image != "" {
r.Image = util.NewType(c.Image)
}
r.Args = args
return r, nil
}
func (c *cli) register(cmd *cobra.Command) error {
if err := logging.Init(cmd); err != nil {
return err
}
cmd.RunE = c.run
f := cmd.PersistentFlags()
f.StringVarP(&c.Namespace, "namespace", "n", constants.NamespaceWithDefault("default"), "Kubernetes namespace")
f.StringSliceVarP(&c.Labels, "label", "l", nil, "Scheduler Render Labels in format <key>=<value>")
f.StringSliceVarP(&c.Envs, "env", "e", nil, "Scheduler Render Envs in format <key>=<value>")
f.StringSliceVarP(&c.Profiles, "profile", "p", nil, "Scheduler Render Profiles")
f.StringVar(&c.Container, "container", DefaultContainerName, "Container Name")
f.StringVar(&c.Image, "image", "", "Image")
return nil
}
func (c *cli) run(cmd *cobra.Command, args []string) error {
if err := logging.Enable(); err != nil {
return err
}
r, err := c.asRequest()
if err != nil {
return err
}
k, ok := kclient.GetDefaultFactory().Client()
if !ok {
return errors.Errorf("Unable to create Kubernetes Client")
}
s := NewScheduler(k, c.Namespace)
rendered, profiles, err := s.Render(context.Background(), r)
if err != nil {
return err
}
logger.Debug("Enabled profiles: %s", strings.Join(profiles, ", "))
data, err := yaml.Marshal(rendered)
if err != nil {
return err
}
if _, err := util.WriteAll(os.Stdout, data); err != nil {
return err
}
return nil
}

View file

@ -25,6 +25,7 @@ import (
core "k8s.io/api/core/v1"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1"
schedulerContainerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container"
schedulerContainerResourcesApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container/resources"
@ -33,59 +34,55 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util"
)
const DefaultContainerName = "job"
func baseAsTemplate(in *pbSchedulerV1.Spec) *schedulerApi.ProfileTemplate {
containers := schedulerContainerApi.Containers{}
type Request struct {
Labels map[string]string
Profiles []string
Envs map[string]string
Container *string
Image *string
Args []string
}
func (r Request) AsTemplate() *schedulerApi.ProfileTemplate {
var container schedulerContainerApi.Container
if len(r.Envs) > 0 {
container.Environments = &schedulerContainerResourcesApi.Environments{}
for k, v := range r.Envs {
container.Environments.Env = append(container.Environments.Env, core.EnvVar{
Name: k,
Value: v,
})
for n, c := range in.Containers {
if c == nil {
continue
}
var container schedulerContainerApi.Container
if image := c.Image; image != nil {
container.Image = &schedulerContainerResourcesApi.Image{
Image: c.Image,
}
}
if len(c.Args) > 0 {
container.Core = &schedulerContainerResourcesApi.Core{
Args: c.Args,
}
}
if len(c.EnvironmentVariables) > 0 {
container.Environments = &schedulerContainerResourcesApi.Environments{}
for k, v := range c.EnvironmentVariables {
container.Env = append(container.Env, core.EnvVar{
Name: k,
Value: v,
})
}
}
containers[n] = container
}
if len(r.Args) > 0 {
container.Core = &schedulerContainerResourcesApi.Core{
Args: r.Args,
}
}
if r.Image != nil {
container.Image = &schedulerContainerResourcesApi.Image{
Image: util.NewType(util.TypeOrDefault(r.Image)),
}
}
return &schedulerApi.ProfileTemplate{
var t = schedulerApi.ProfileTemplate{
Priority: util.NewType(math.MaxInt),
Pod: &schedulerPodApi.Pod{
Metadata: &schedulerPodResourcesApi.Metadata{
Labels: util.MergeMaps(true, r.Labels),
},
},
Pod: &schedulerPodApi.Pod{},
Container: &schedulerApi.ProfileContainerTemplate{
Containers: map[string]schedulerContainerApi.Container{
util.TypeOrDefault(r.Container, DefaultContainerName): container,
},
Containers: containers,
},
}
if job := in.Job; job != nil {
t.Pod.Metadata = &schedulerPodResourcesApi.Metadata{
Labels: job.Labels,
}
}
return &t
}

View file

@ -1,25 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package scheduler
import "github.com/arangodb/kube-arangodb/pkg/logging"
var logger = logging.Global().RegisterAndGetLogger("scheduler", logging.Info)

View file

@ -25,6 +25,7 @@ import (
core "k8s.io/api/core/v1"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1"
"github.com/arangodb/kube-arangodb/pkg/debug_package/generators/kubernetes"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
@ -39,7 +40,7 @@ func NewScheduler(client kclient.Client, namespace string) Scheduler {
}
type Scheduler interface {
Render(ctx context.Context, in Request, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error)
Render(ctx context.Context, in *pbSchedulerV1.Spec, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error)
}
type scheduler struct {
@ -47,7 +48,11 @@ type scheduler struct {
namespace string
}
func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error) {
func (s scheduler) Render(ctx context.Context, in *pbSchedulerV1.Spec, templates ...*schedulerApi.ProfileTemplate) (*core.PodTemplateSpec, []string, error) {
if in == nil {
return nil, nil, errors.Errorf("Unable to parse nil Spec")
}
profileMap, err := kubernetes.MapObjects[*schedulerApi.ArangoProfileList, *schedulerApi.ArangoProfile](ctx, s.client.Arango().SchedulerV1alpha1().ArangoProfiles(s.namespace), func(result *schedulerApi.ArangoProfileList) []*schedulerApi.ArangoProfile {
q := make([]*schedulerApi.ArangoProfile, len(result.Items))
@ -62,6 +67,18 @@ func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedul
return nil, nil, err
}
var labels map[string]string
var additionalProfiles []string
if job := in.Job; job != nil {
labels = job.Labels
additionalProfiles = job.Profiles
}
if len(in.Containers) == 0 {
return nil, nil, errors.Errorf("Required at least 1 container")
}
profiles := profileMap.AsList().Filter(func(a *schedulerApi.ArangoProfile) bool {
return a != nil && a.Spec.Template != nil
}).Filter(func(a *schedulerApi.ArangoProfile) bool {
@ -69,14 +86,14 @@ func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedul
return false
}
if !a.Spec.Selectors.Select(in.Labels) {
if !a.Spec.Selectors.Select(labels) {
return false
}
return true
})
for _, name := range in.Profiles {
for _, name := range additionalProfiles {
p, ok := profileMap.ByName(name)
if !ok {
return nil, nil, errors.Errorf("Profile with name `%s` is missing", name)
@ -103,7 +120,7 @@ func (s scheduler) Render(ctx context.Context, in Request, templates ...*schedul
extracted := schedulerApi.ProfileTemplates(kubernetes.Extract(profiles, func(in *schedulerApi.ArangoProfile) *schedulerApi.ProfileTemplate {
return in.Spec.Template
}).Append(templates...).Append(in.AsTemplate()))
}).Append(templates...).Append(baseAsTemplate(in)))
names := kubernetes.Extract(profiles, func(in *schedulerApi.ArangoProfile) string {
return in.GetName()

View file

@ -30,6 +30,7 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
pbSchedulerV1 "github.com/arangodb/kube-arangodb/integrations/scheduler/v1/definition"
schedulerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1"
schedulerContainerApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container"
schedulerContainerResourcesApi "github.com/arangodb/kube-arangodb/pkg/apis/scheduler/v1alpha1/container/resources"
@ -38,6 +39,8 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/tests"
)
const DefaultContainerName = "job"
func newScheduler(t *testing.T, objects ...*schedulerApi.ArangoProfile) Scheduler {
client := kclient.NewFakeClientBuilder().Client()
@ -55,7 +58,58 @@ type validatorExec func(in validator)
type validator func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string)
func render(t *testing.T, s Scheduler, in Request, templates ...*schedulerApi.ProfileTemplate) validatorExec {
func getRequest(in ...func(obj *pbSchedulerV1.Spec)) *pbSchedulerV1.Spec {
var r pbSchedulerV1.Spec
for _, i := range in {
i(&r)
}
return &r
}
func withProfiles(profiles ...string) func(obj *pbSchedulerV1.Spec) {
return func(obj *pbSchedulerV1.Spec) {
if obj.Job == nil {
obj.Job = &pbSchedulerV1.JobBase{}
}
obj.Job.Profiles = append(obj.Job.Profiles, profiles...)
}
}
func withLabels(labels map[string]string) func(obj *pbSchedulerV1.Spec) {
return func(obj *pbSchedulerV1.Spec) {
if obj.Job == nil {
obj.Job = &pbSchedulerV1.JobBase{}
}
if obj.Job.Labels == nil {
obj.Job.Labels = make(map[string]string)
}
for k, v := range labels {
obj.Job.Labels[k] = v
}
}
}
func withDefaultContainer(in ...func(obj *pbSchedulerV1.ContainerBase)) func(obj *pbSchedulerV1.Spec) {
return func(obj *pbSchedulerV1.Spec) {
if obj.Containers == nil {
obj.Containers = make(map[string]*pbSchedulerV1.ContainerBase)
}
var c pbSchedulerV1.ContainerBase
for _, i := range in {
i(&c)
}
obj.Containers[DefaultContainerName] = &c
}
}
func render(t *testing.T, s Scheduler, in *pbSchedulerV1.Spec, templates ...*schedulerApi.ProfileTemplate) validatorExec {
pod, accepted, err := s.Render(context.Background(), in, templates...)
t.Logf("Accepted templates: %s", strings.Join(accepted, ", "))
if err != nil {
@ -79,20 +133,22 @@ func runValidate(t *testing.T, err error, template *core.PodTemplateSpec, accept
}
}
func Test_Nil(t *testing.T) {
render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), nil)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.EqualError(t, err, "Unable to parse nil Spec")
})
}
func Test_NoProfiles(t *testing.T) {
render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.NoError(t, err)
require.Len(t, accepted, 0)
tests.GetContainerByNameT(t, template.Spec.Containers, DefaultContainerName)
render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), &pbSchedulerV1.Spec{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.EqualError(t, err, "Required at least 1 container")
})
}
func Test_MissingSelectedProfile(t *testing.T) {
render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")), Request{
Profiles: []string{"missing"},
})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
render(t, newScheduler(t, tests.NewMetaObjectInDefaultNamespace[*schedulerApi.ArangoProfile](t, "test")),
getRequest(withProfiles("missing"), withDefaultContainer()),
)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.EqualError(t, err, "Profile with name `missing` is missing")
})
}
@ -110,7 +166,9 @@ func Test_SelectorWithoutSelector(t *testing.T) {
},
},
}
})), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
})), getRequest(withDefaultContainer(func(obj *pbSchedulerV1.ContainerBase) {
obj.Image = util.NewType("")
})))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.NoError(t, err)
require.Len(t, accepted, 0)
@ -136,7 +194,7 @@ func Test_SelectorWithSelectorAll(t *testing.T) {
},
},
}
})), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
})), getRequest(withDefaultContainer()))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.NoError(t, err)
require.Len(t, accepted, 1)
@ -172,7 +230,7 @@ func Test_SelectorWithSpecificSelector_MissingLabel(t *testing.T) {
},
},
}
})), Request{})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
})), getRequest(withDefaultContainer()))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.NoError(t, err)
require.Len(t, accepted, 0)
@ -205,11 +263,9 @@ func Test_SelectorWithSpecificSelector_PresentLabel(t *testing.T) {
},
},
}
})), Request{
Labels: map[string]string{
"ml.arangodb.com/type": "training",
},
}, nil)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
})), getRequest(withDefaultContainer(), withLabels(map[string]string{
"ml.arangodb.com/type": "training",
})), nil)(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.NoError(t, err)
require.Len(t, accepted, 1)
@ -270,11 +326,10 @@ func Test_SelectorWithSpecificSelector_PresentLabel_ByPriority(t *testing.T) {
},
},
}
})), Request{
Labels: map[string]string{
"ml.arangodb.com/type": "training",
},
})(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
})), getRequest(withDefaultContainer(), withLabels(map[string]string{
"ml.arangodb.com/type": "training",
},
)))(func(t *testing.T, err error, template *core.PodTemplateSpec, accepted []string) {
require.NoError(t, err)
require.Len(t, accepted, 2)

View file

@ -0,0 +1,126 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package resources
import (
"context"
"fmt"
"strings"
"sync"
authorization "k8s.io/api/authorization/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
type AccessRequest struct {
Verb, Group, Version, Resource, SubResource, Name, Namespace string
}
func (a AccessRequest) Verify(ctx context.Context, client kubernetes.Interface) authorization.SubjectAccessReviewStatus {
return VerifyAccessRequestStatus(ctx, client, a.Verb, a.Group, a.Version, a.Resource, a.SubResource, a.Name, a.Namespace)
}
func (a AccessRequest) VerifyErr(ctx context.Context, client kubernetes.Interface) error {
res := a.Verify(ctx, client)
if res.Allowed {
return nil
}
if res.Reason != "" {
return errors.Errorf("Unable to access %s: %s", a.String(), res.Reason)
}
return errors.Errorf("Unable to access %s", a.String())
}
func (a AccessRequest) String() string {
gv := a.Version
if a.Group != "" {
gv = fmt.Sprintf("%s/%s", a.Group, a.Version)
}
res := a.Resource
if a.SubResource != "" {
res = fmt.Sprintf("%s/%s", a.Resource, a.SubResource)
}
n := a.Name
if a.Namespace != "" {
n = fmt.Sprintf("%s/%s", a.Namespace, a.Name)
}
return fmt.Sprintf("%s/%s/%s %s", gv, res, n, strings.ToUpper(a.Verb))
}
func VerifyAll(ctx context.Context, client kubernetes.Interface, requests ...AccessRequest) error {
var wg sync.WaitGroup
errs := make([]error, len(requests))
for id := range requests {
wg.Add(1)
go func(id int) {
defer wg.Done()
errs[id] = requests[id].VerifyErr(ctx, client)
}(id)
}
wg.Wait()
return errors.Errors(errs...)
}
func VerifyAccessRequestStatus(ctx context.Context, client kubernetes.Interface, verb, group, version, resource, subResource, name, namespace string) authorization.SubjectAccessReviewStatus {
resp, err := VerifyAccessRequest(ctx, client, verb, group, version, resource, subResource, name, namespace)
if err != nil {
return authorization.SubjectAccessReviewStatus{
Allowed: false,
Reason: fmt.Sprintf("Unable to check access: %s", err.Error()),
}
}
return resp.Status
}
func VerifyAccessRequest(ctx context.Context, client kubernetes.Interface, verb, group, version, resource, subResource, name, namespace string) (*authorization.SelfSubjectAccessReview, error) {
review := authorization.SelfSubjectAccessReview{
Spec: authorization.SelfSubjectAccessReviewSpec{
ResourceAttributes: &authorization.ResourceAttributes{
Namespace: namespace,
Verb: verb,
Group: group,
Version: version,
Resource: resource,
Subresource: subResource,
Name: name,
},
},
}
return client.AuthorizationV1().SelfSubjectAccessReviews().Create(ctx, &review, meta.CreateOptions{})
}

View file

@ -63,6 +63,12 @@ func Default[T interface{}]() T {
return d
}
// DefaultInterface returns generic default value for type T as Interface
func DefaultInterface[T interface{}]() interface{} {
var d T
return d
}
// First returns first not nil value
func First[T interface{}](input ...*T) *T {
for _, i := range input {

View file

@ -48,6 +48,7 @@ import (
"github.com/arangodb/kube-arangodb/pkg/operatorV2/operation"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/kerrors"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
)
type handleFunc struct {
@ -223,6 +224,10 @@ func CreateObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe
}
}
func UpdateObjectsC(t *testing.T, client kclient.Client, objects ...interface{}) func(t *testing.T) {
return UpdateObjects(t, client.Kubernetes(), client.Arango(), objects...)
}
func UpdateObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) func(t *testing.T) {
for _, object := range objects {
switch v := object.(type) {
@ -349,7 +354,7 @@ func UpdateObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe
}
}
func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) func(t *testing.T) {
func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) {
for _, object := range objects {
switch v := object.(type) {
case **batch.CronJob:
@ -372,11 +377,21 @@ func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe
vl := *v
require.NoError(t, k8s.CoreV1().Secrets(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{}))
case **core.Service:
require.NotNil(t, v)
vl := *v
require.NoError(t, k8s.CoreV1().Services(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{}))
case **core.ServiceAccount:
require.NotNil(t, v)
vl := *v
require.NoError(t, k8s.CoreV1().ServiceAccounts(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{}))
case **apps.StatefulSet:
require.NotNil(t, v)
vl := *v
err := k8s.AppsV1().StatefulSets(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{})
require.NoError(t, err)
case **api.ArangoDeployment:
require.NotNil(t, v)
@ -432,14 +447,19 @@ func DeleteObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSe
vl := *v
require.NoError(t, k8s.RbacV1().RoleBindings(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{}))
case **schedulerApi.ArangoProfile:
require.NotNil(t, v)
vl := *v
require.NoError(t, arango.SchedulerV1alpha1().ArangoProfiles(vl.GetNamespace()).Delete(context.Background(), vl.GetName(), meta.DeleteOptions{}))
default:
require.Fail(t, fmt.Sprintf("Unable to create object: %s", reflect.TypeOf(v).String()))
require.Fail(t, fmt.Sprintf("Unable to delete object: %s", reflect.TypeOf(v).String()))
}
}
}
return func(t *testing.T) {
RefreshObjects(t, k8s, arango, objects...)
}
func RefreshObjectsC(t *testing.T, client kclient.Client, objects ...interface{}) {
RefreshObjects(t, client.Kubernetes(), client.Arango(), objects...)
}
func RefreshObjects(t *testing.T, k8s kubernetes.Interface, arango arangoClientSet.Interface, objects ...interface{}) {

View file

@ -57,6 +57,8 @@ func NewMetaObjectRun[T meta.Object](t *testing.T) {
refresh(t)
UpdateObjects(t, c.Kubernetes(), c.Arango(), &obj)
DeleteObjects(t, c.Kubernetes(), c.Arango(), &obj)
})
})
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -25,6 +25,8 @@ import (
"time"
"github.com/stretchr/testify/require"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
func DurationBetween() func(t *testing.T, expected time.Duration, skew float64) {
@ -39,3 +41,48 @@ func DurationBetween() func(t *testing.T, expected time.Duration, skew float64)
}
}
}
func Interrupt() error {
return interrupt{}
}
type interrupt struct {
}
func (i interrupt) Error() string {
return "interrupt"
}
func NewTimeout(in Timeout) Timeout {
return in
}
type Timeout func() error
func (t Timeout) WithTimeout(timeout, interval time.Duration) error {
timeoutT := time.NewTimer(timeout)
defer timeoutT.Stop()
intervalT := time.NewTicker(interval)
defer intervalT.Stop()
for {
select {
case <-timeoutT.C:
return errors.Errorf("Timeouted!")
case <-intervalT.C:
if err := t(); err != nil {
var interrupt interrupt
if errors.As(err, &interrupt) {
return nil
}
return err
}
}
}
}
func (t Timeout) WithTimeoutT(z *testing.T, timeout, interval time.Duration) {
require.NoError(z, t.WithTimeout(timeout, interval))
}