From b2c88f6ad8f481b41911b2e4fa73d87222cde910 Mon Sep 17 00:00:00 2001 From: Adam Janikowski <12255597+ajanikow@users.noreply.github.com> Date: Fri, 8 Dec 2023 10:39:19 +0100 Subject: [PATCH] [Feature] [ML] Shutdown Handler (#1529) --- CHANGELOG.md | 1 + cmd/ml_storage.go | 15 ++- docs/api/ArangoMLStorage.V1Alpha1.md | 10 ++ pkg/api/shutdown/v1/operator.pb.go | 96 ++++++++++++++++ pkg/api/shutdown/v1/operator.proto | 31 +++++ pkg/api/shutdown/v1/operator_grpc.pb.go | 106 ++++++++++++++++++ .../ml/v1alpha1/extension_spec_deployment.go | 4 +- .../ml/v1alpha1/storage_spec_mode_sidecar.go | 15 +++ pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go | 5 + pkg/apis/shared/v1/image.go | 8 ++ pkg/apis/shared/validate.go | 15 +++ pkg/crd/crds/ml-storage.schema.generated.yaml | 4 + pkg/ml/storage/service.community.go | 4 +- pkg/ml/storage/service.go | 6 - pkg/operator/operator.community.go | 3 +- pkg/operator/operator.go | 33 ++---- pkg/operatorV2/filter.go | 42 +++++++ pkg/operatorV2/informer.go | 13 ++- pkg/operatorV2/operation/item.go | 13 ++- pkg/operatorV2/operator.go | 8 +- pkg/util/constants/constants.go | 5 + pkg/util/context.go | 11 ++ pkg/util/shutdown/grpc.go | 75 +++++++++++++ pkg/util/shutdown/server.go | 89 +++++++++++++++ pkg/util/shutdown/shutdown.go | 22 +++- pkg/util/svc/service.go | 57 ++++++++++ 26 files changed, 641 insertions(+), 50 deletions(-) create mode 100644 pkg/api/shutdown/v1/operator.pb.go create mode 100644 pkg/api/shutdown/v1/operator.proto create mode 100644 pkg/api/shutdown/v1/operator_grpc.pb.go create mode 100644 pkg/operatorV2/filter.go create mode 100644 pkg/util/shutdown/grpc.go create mode 100644 pkg/util/shutdown/server.go create mode 100644 pkg/util/svc/service.go diff --git a/CHANGELOG.md b/CHANGELOG.md index c74677641..63e386e5e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ - (Improvement) (ML) Switch to fsnotify for file watching for MacOS support - (Feature) (ML) Unify Images, Resources and Lifecycle - (Improvement) (ML) CronJob status update +- (Improvement) (ML) Job Sidecar Shutdown ## [1.2.35](https://github.com/arangodb/kube-arangodb/tree/1.2.35) (2023-11-06) - (Maintenance) Update go-driver to v1.6.0, update IsNotFound() checks diff --git a/cmd/ml_storage.go b/cmd/ml_storage.go index 2cbdc304e..e57e6a7e0 100644 --- a/cmd/ml_storage.go +++ b/cmd/ml_storage.go @@ -21,14 +21,14 @@ package cmd import ( - "context" "os" "github.com/rs/zerolog/log" "github.com/spf13/cobra" "github.com/arangodb/kube-arangodb/pkg/ml/storage" - "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/shutdown" + "github.com/arangodb/kube-arangodb/pkg/util/svc" ) var ( @@ -48,6 +48,10 @@ var ( cmdMLStorageS3Options struct { storage.ServiceConfig } + + cmdMLShutdownOptions struct { + shutdown.ServiceConfig + } ) func init() { @@ -55,6 +59,7 @@ func init() { cmdMLStorage.AddCommand(cmdMLStorageS3) f := cmdMLStorageS3.PersistentFlags() + f.StringVar(&cmdMLShutdownOptions.ListenAddress, "shutdown.address", "", "Address the GRPC shutdown service will listen on (IP:port)") f.StringVar(&cmdMLStorageS3Options.ListenAddress, "server.address", "", "Address the GRPC service will listen on (IP:port)") f.StringVar(&cmdMLStorageS3Options.S3.Endpoint, "s3.endpoint", "", "Endpoint of S3 API implementation") @@ -76,12 +81,10 @@ func cmdMLStorageS3Run(cmd *cobra.Command, _ []string) { } func cmdMLStorageS3RunE(_ *cobra.Command) error { - ctx := util.CreateSignalContext(context.Background()) - - svc, err := storage.NewService(ctx, storage.StorageTypeS3Proxy, cmdMLStorageS3Options.ServiceConfig) + service, err := storage.NewService(shutdown.Context(), storage.StorageTypeS3Proxy, cmdMLStorageS3Options.ServiceConfig) if err != nil { return err } - return svc.Run(ctx) + return svc.RunServices(shutdown.Context(), service, shutdown.ServiceCentral(cmdMLShutdownOptions.ServiceConfig)) } diff --git a/docs/api/ArangoMLStorage.V1Alpha1.md b/docs/api/ArangoMLStorage.V1Alpha1.md index b065d0ae6..371485ec8 100644 --- a/docs/api/ArangoMLStorage.V1Alpha1.md +++ b/docs/api/ArangoMLStorage.V1Alpha1.md @@ -143,6 +143,16 @@ Resources holds resource requests & limits for container Links: * [Documentation of core.ResourceRequirements](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#resourcerequirements-v1-core) +*** + +### .spec.mode.sidecar.shutdownListenPort + +Type: `integer` [\[ref\]](https://github.com/arangodb/kube-arangodb/blob/1.2.35/pkg/apis/ml/v1alpha1/storage_spec_mode_sidecar.go#L36) + +ShutdownListenPort defines on which port the sidecar container will be listening for shutdown connections + +Default Value: `9202` + ## Status ### .status.conditions diff --git a/pkg/api/shutdown/v1/operator.pb.go b/pkg/api/shutdown/v1/operator.pb.go new file mode 100644 index 000000000..385a3371b --- /dev/null +++ b/pkg/api/shutdown/v1/operator.pb.go @@ -0,0 +1,96 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.21.1 +// source: pkg/api/shutdown/v1/operator.proto + +package shutdown + +import ( + server "github.com/arangodb/kube-arangodb/pkg/api/server" + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +var File_pkg_api_shutdown_v1_operator_proto protoreflect.FileDescriptor + +var file_pkg_api_shutdown_v1_operator_proto_rawDesc = []byte{ + 0x0a, 0x22, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f, + 0x77, 0x6e, 0x2f, 0x76, 0x31, 0x2f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x1a, 0x1d, 0x70, 0x6b, + 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x65, + 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x3c, 0x0a, 0x08, 0x53, + 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x12, 0x30, 0x0a, 0x0e, 0x53, 0x68, 0x75, 0x74, 0x64, + 0x6f, 0x77, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x0d, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x65, 0x72, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0d, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, + 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x70, + 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var file_pkg_api_shutdown_v1_operator_proto_goTypes = []interface{}{ + (*server.Empty)(nil), // 0: server.Empty +} +var file_pkg_api_shutdown_v1_operator_proto_depIdxs = []int32{ + 0, // 0: server.Shutdown.ShutdownServer:input_type -> server.Empty + 0, // 1: server.Shutdown.ShutdownServer:output_type -> server.Empty + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkg_api_shutdown_v1_operator_proto_init() } +func file_pkg_api_shutdown_v1_operator_proto_init() { + if File_pkg_api_shutdown_v1_operator_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_api_shutdown_v1_operator_proto_rawDesc, + NumEnums: 0, + NumMessages: 0, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_pkg_api_shutdown_v1_operator_proto_goTypes, + DependencyIndexes: file_pkg_api_shutdown_v1_operator_proto_depIdxs, + }.Build() + File_pkg_api_shutdown_v1_operator_proto = out.File + file_pkg_api_shutdown_v1_operator_proto_rawDesc = nil + file_pkg_api_shutdown_v1_operator_proto_goTypes = nil + file_pkg_api_shutdown_v1_operator_proto_depIdxs = nil +} diff --git a/pkg/api/shutdown/v1/operator.proto b/pkg/api/shutdown/v1/operator.proto new file mode 100644 index 000000000..eb02af2cf --- /dev/null +++ b/pkg/api/shutdown/v1/operator.proto @@ -0,0 +1,31 @@ +// +// DISCLAIMER +// +// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +syntax = "proto3"; + +option go_package = "github.com/arangodb/kube-arangodb/pkg/api/shutdown"; + +package server; + +import "pkg/api/server/operator.proto"; + +service Shutdown { + rpc ShutdownServer (Empty) returns (Empty) {} +} \ No newline at end of file diff --git a/pkg/api/shutdown/v1/operator_grpc.pb.go b/pkg/api/shutdown/v1/operator_grpc.pb.go new file mode 100644 index 000000000..9e561476d --- /dev/null +++ b/pkg/api/shutdown/v1/operator_grpc.pb.go @@ -0,0 +1,106 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.1 +// source: pkg/api/shutdown/v1/operator.proto + +package shutdown + +import ( + context "context" + server "github.com/arangodb/kube-arangodb/pkg/api/server" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// ShutdownClient is the client API for Shutdown service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ShutdownClient interface { + ShutdownServer(ctx context.Context, in *server.Empty, opts ...grpc.CallOption) (*server.Empty, error) +} + +type shutdownClient struct { + cc grpc.ClientConnInterface +} + +func NewShutdownClient(cc grpc.ClientConnInterface) ShutdownClient { + return &shutdownClient{cc} +} + +func (c *shutdownClient) ShutdownServer(ctx context.Context, in *server.Empty, opts ...grpc.CallOption) (*server.Empty, error) { + out := new(server.Empty) + err := c.cc.Invoke(ctx, "/server.Shutdown/ShutdownServer", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ShutdownServer is the server API for Shutdown service. +// All implementations must embed UnimplementedShutdownServer +// for forward compatibility +type ShutdownServer interface { + ShutdownServer(context.Context, *server.Empty) (*server.Empty, error) + mustEmbedUnimplementedShutdownServer() +} + +// UnimplementedShutdownServer must be embedded to have forward compatible implementations. +type UnimplementedShutdownServer struct { +} + +func (UnimplementedShutdownServer) ShutdownServer(context.Context, *server.Empty) (*server.Empty, error) { + return nil, status.Errorf(codes.Unimplemented, "method ShutdownServer not implemented") +} +func (UnimplementedShutdownServer) mustEmbedUnimplementedShutdownServer() {} + +// UnsafeShutdownServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ShutdownServer will +// result in compilation errors. +type UnsafeShutdownServer interface { + mustEmbedUnimplementedShutdownServer() +} + +func RegisterShutdownServer(s grpc.ServiceRegistrar, srv ShutdownServer) { + s.RegisterService(&Shutdown_ServiceDesc, srv) +} + +func _Shutdown_ShutdownServer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(server.Empty) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ShutdownServer).ShutdownServer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/server.Shutdown/ShutdownServer", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ShutdownServer).ShutdownServer(ctx, req.(*server.Empty)) + } + return interceptor(ctx, in, info, handler) +} + +// Shutdown_ServiceDesc is the grpc.ServiceDesc for Shutdown service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Shutdown_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "server.Shutdown", + HandlerType: (*ShutdownServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "ShutdownServer", + Handler: _Shutdown_ShutdownServer_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "pkg/api/shutdown/v1/operator.proto", +} diff --git a/pkg/apis/ml/v1alpha1/extension_spec_deployment.go b/pkg/apis/ml/v1alpha1/extension_spec_deployment.go index 4b06fea84..076c23ee8 100644 --- a/pkg/apis/ml/v1alpha1/extension_spec_deployment.go +++ b/pkg/apis/ml/v1alpha1/extension_spec_deployment.go @@ -107,9 +107,7 @@ func (s *ArangoMLExtensionSpecDeployment) Validate() error { } errs := []error{ - shared.PrefixResourceErrors("service", shared.ValidateOptional(s.GetService(), func(service ArangoMLExtensionSpecDeploymentService) error { - return service.Validate() - })), + shared.PrefixResourceErrors("service", shared.ValidateOptional(s.GetService(), func(s ArangoMLExtensionSpecDeploymentService) error { return s.Validate() })), } if s.GetReplicas() < 0 || s.GetReplicas() > 10 { diff --git a/pkg/apis/ml/v1alpha1/storage_spec_mode_sidecar.go b/pkg/apis/ml/v1alpha1/storage_spec_mode_sidecar.go index 8438852cb..e858d0070 100644 --- a/pkg/apis/ml/v1alpha1/storage_spec_mode_sidecar.go +++ b/pkg/apis/ml/v1alpha1/storage_spec_mode_sidecar.go @@ -31,6 +31,10 @@ type ArangoMLStorageSpecModeSidecar struct { // +doc/default: 9201 ListenPort *uint16 `json:"listenPort,omitempty"` + // ShutdownListenPort defines on which port the sidecar container will be listening for shutdown connections + // +doc/default: 9202 + ShutdownListenPort *uint16 `json:"shutdownListenPort,omitempty"` + // Image define default image used for the extension *sharedApi.Image `json:",inline"` @@ -65,6 +69,10 @@ func (s *ArangoMLStorageSpecModeSidecar) Validate() error { err = append(err, shared.PrefixResourceErrors("listenPort", errors.Newf("must be positive"))) } + if s.GetShutdownListenPort() < 1 { + err = append(err, shared.PrefixResourceErrors("shutdownListenPort", errors.Newf("must be positive"))) + } + err = append(err, s.GetResources().Validate()) return shared.WithErrors(err...) @@ -76,3 +84,10 @@ func (s *ArangoMLStorageSpecModeSidecar) GetListenPort() uint16 { } return *s.ListenPort } + +func (s *ArangoMLStorageSpecModeSidecar) GetShutdownListenPort() uint16 { + if s == nil || s.ShutdownListenPort == nil { + return 9202 + } + return *s.ShutdownListenPort +} diff --git a/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go index 76eaf9e8e..6a97c20ef 100644 --- a/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/ml/v1alpha1/zz_generated.deepcopy.go @@ -774,6 +774,11 @@ func (in *ArangoMLStorageSpecModeSidecar) DeepCopyInto(out *ArangoMLStorageSpecM *out = new(uint16) **out = **in } + if in.ShutdownListenPort != nil { + in, out := &in.ShutdownListenPort, &out.ShutdownListenPort + *out = new(uint16) + **out = **in + } if in.Image != nil { in, out := &in.Image, &out.Image *out = new(sharedv1.Image) diff --git a/pkg/apis/shared/v1/image.go b/pkg/apis/shared/v1/image.go index 4a4e910c3..e40de8043 100644 --- a/pkg/apis/shared/v1/image.go +++ b/pkg/apis/shared/v1/image.go @@ -38,6 +38,14 @@ type Image struct { PullSecrets []string `json:"pullSecrets,omitempty"` } +func (i *Image) GetImage() string { + if i == nil || i.Image == nil { + return "" + } + + return *i.Image +} + func (i *Image) Validate() error { if i == nil { return nil diff --git a/pkg/apis/shared/validate.go b/pkg/apis/shared/validate.go index ab629d59a..8aaf5369b 100644 --- a/pkg/apis/shared/validate.go +++ b/pkg/apis/shared/validate.go @@ -76,6 +76,21 @@ func ValidatePullPolicy(in core.PullPolicy) error { return errors.Newf("Unknown pull policy: '%s'", string(in)) } +type ValidateInterface interface { + Validate() error +} + +func Validate[T interface{}](in T) error { + return validate(in) +} + +func validate(in any) error { + if v, ok := in.(ValidateInterface); ok { + return v.Validate() + } + return nil +} + // ValidateOptional Validates object if is not nil func ValidateOptional[T interface{}](in *T, validator func(T) error) error { if in != nil { diff --git a/pkg/crd/crds/ml-storage.schema.generated.yaml b/pkg/crd/crds/ml-storage.schema.generated.yaml index 0db20e409..addb482bc 100644 --- a/pkg/crd/crds/ml-storage.schema.generated.yaml +++ b/pkg/crd/crds/ml-storage.schema.generated.yaml @@ -95,6 +95,10 @@ v1alpha1: type: string type: object type: object + shutdownListenPort: + description: ShutdownListenPort defines on which port the sidecar container will be listening for shutdown connections + format: int32 + type: integer type: object type: object type: object diff --git a/pkg/ml/storage/service.community.go b/pkg/ml/storage/service.community.go index 52d660d8d..29c574730 100644 --- a/pkg/ml/storage/service.community.go +++ b/pkg/ml/storage/service.community.go @@ -25,8 +25,10 @@ package storage import ( "context" "errors" + + "github.com/arangodb/kube-arangodb/pkg/util/svc" ) -func NewService(_ context.Context, _ StorageType, _ ServiceConfig) (Service, error) { +func NewService(_ context.Context, _ StorageType, _ ServiceConfig) (svc.Service, error) { return nil, errors.New("this service is available only in enterprise edition of operator") } diff --git a/pkg/ml/storage/service.go b/pkg/ml/storage/service.go index 99237dc11..5548a2822 100644 --- a/pkg/ml/storage/service.go +++ b/pkg/ml/storage/service.go @@ -21,8 +21,6 @@ package storage import ( - "context" - "github.com/arangodb/kube-arangodb/pkg/ml/storage/s3" ) @@ -36,7 +34,3 @@ type ServiceConfig struct { ListenAddress string S3 s3.Config } - -type Service interface { - Run(ctx context.Context) error -} diff --git a/pkg/operator/operator.community.go b/pkg/operator/operator.community.go index 6558d1c80..b8b6b9870 100644 --- a/pkg/operator/operator.community.go +++ b/pkg/operator/operator.community.go @@ -23,6 +23,7 @@ package operator import ( + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned" @@ -36,6 +37,6 @@ func (o *Operator) onStartML(stop <-chan struct{}) { panic("Unable to start ML Operator in Community") } -func (o *Operator) onStartOperatorV2ML(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory) { +func (o *Operator) onStartOperatorV2ML(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) { panic("Unable to start ML Operator in Community") } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 97df06724..a050423cc 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -29,8 +29,8 @@ import ( "github.com/rs/zerolog" meta "k8s.io/apimachinery/pkg/apis/meta/v1" kwatch "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" "github.com/arangodb/kube-arangodb/pkg/apis/apps" @@ -261,38 +261,29 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st zerolog.SetGlobalLevel(zerolog.DebugLevel) - restClient, err := rest.InClusterConfig() - if err != nil { - panic(err) - } + eventRecorder := event.NewEventRecorder(operatorName, o.Client.Kubernetes()) - arangoClientSet, err := arangoClientSet.NewForConfig(restClient) - if err != nil { - panic(err) - } + arangoInformer := arangoInformer.NewSharedInformerFactoryWithOptions(o.Client.Arango(), 10*time.Second, arangoInformer.WithNamespace(o.Namespace)) - kubeClientSet, err := kubernetes.NewForConfig(restClient) - if err != nil { - panic(err) - } - - eventRecorder := event.NewEventRecorder(operatorName, kubeClientSet) - - arangoInformer := arangoInformer.NewSharedInformerFactoryWithOptions(arangoClientSet, 10*time.Second, arangoInformer.WithNamespace(o.Namespace)) + kubeInformer := informers.NewSharedInformerFactoryWithOptions(o.Client.Kubernetes(), 15*time.Second, informers.WithNamespace(o.Namespace)) switch operatorType { case appsOperator: - o.onStartOperatorV2Apps(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer) + o.onStartOperatorV2Apps(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer) o.Dependencies.AppsProbe.SetReady() case backupOperator: - o.onStartOperatorV2Backup(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer) + o.onStartOperatorV2Backup(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer) o.Dependencies.BackupProbe.SetReady() case mlOperator: - o.onStartOperatorV2ML(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer) + o.onStartOperatorV2ML(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer) o.Dependencies.MlProbe.SetReady() } - if err = operator.RegisterStarter(arangoInformer); err != nil { + if err := operator.RegisterStarter(arangoInformer); err != nil { + panic(err) + } + + if err := operator.RegisterStarter(kubeInformer); err != nil { panic(err) } diff --git a/pkg/operatorV2/filter.go b/pkg/operatorV2/filter.go new file mode 100644 index 000000000..a70e79315 --- /dev/null +++ b/pkg/operatorV2/filter.go @@ -0,0 +1,42 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package operator + +import meta "k8s.io/apimachinery/pkg/apis/meta/v1" + +type InformerFilter func(obj meta.Object) bool + +func informerFilterMerge(filters ...InformerFilter) InformerFilter { + if len(filters) == 0 { + return func(obj meta.Object) bool { + return true + } + } + return func(obj meta.Object) bool { + for _, filter := range filters { + if !filter(obj) { + return false + } + } + + return true + } +} diff --git a/pkg/operatorV2/informer.go b/pkg/operatorV2/informer.go index 9f1372a85..13e2f8ff5 100644 --- a/pkg/operatorV2/informer.go +++ b/pkg/operatorV2/informer.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -27,12 +27,13 @@ import ( "github.com/arangodb/kube-arangodb/pkg/operatorV2/operation" ) -func newResourceEventHandler(operator Operator, group, version, kind string) cache.ResourceEventHandler { +func newResourceEventHandler(operator Operator, group, version, kind string, filter InformerFilter) cache.ResourceEventHandler { return &resourceEventWrapper{ Operator: operator, Group: group, Version: version, Kind: kind, + filter: filter, } } @@ -40,6 +41,8 @@ type resourceEventWrapper struct { Operator Operator Group, Version, Kind string + + filter InformerFilter } func (r *resourceEventWrapper) push(o operation.Operation, obj interface{}) { @@ -48,7 +51,13 @@ func (r *resourceEventWrapper) push(o operation.Operation, obj interface{}) { } if object, ok := obj.(meta.Object); ok { + if item, err := operation.NewItemFromObject(o, r.Group, r.Version, r.Kind, object); err == nil { + if f := r.filter; f != nil { + if !f(object) { + return + } + } r.Operator.EnqueueItem(item) } } diff --git a/pkg/operatorV2/operation/item.go b/pkg/operatorV2/operation/item.go index e21b6f763..78da7c123 100644 --- a/pkg/operatorV2/operation/item.go +++ b/pkg/operatorV2/operation/item.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -23,6 +23,7 @@ package operation import ( "strings" + "github.com/rs/zerolog" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/arangodb/kube-arangodb/pkg/util/errors" @@ -132,3 +133,13 @@ func (i Item) Validate() error { func (i Item) String() string { return strings.Join([]string{string(i.Operation), i.Group, i.Version, i.Kind, i.Namespace, i.Name}, separator) } + +func (i Item) WrapLogger(in *zerolog.Event) *zerolog.Event { + return in. + Str("operation", string(i.Operation)). + Str("namespace", i.Namespace). + Str("name", i.Name). + Str("group", i.Group). + Str("version", i.Version). + Str("kind", i.Kind) +} diff --git a/pkg/operatorV2/operator.go b/pkg/operatorV2/operator.go index 68f709023..4010a5851 100644 --- a/pkg/operatorV2/operator.go +++ b/pkg/operatorV2/operator.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -49,7 +49,7 @@ type Operator interface { Start(threadiness int, stopCh <-chan struct{}) error - RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string) error + RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string, filters ...InformerFilter) error RegisterStarter(starter Starter) error RegisterHandler(handler Handler) error @@ -158,7 +158,7 @@ func (o *operator) EnqueueItem(item operation.Item) { o.workqueue.Add(item.String()) } -func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string) error { +func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string, filters ...InformerFilter) error { o.lock.Lock() defer o.lock.Unlock() @@ -174,7 +174,7 @@ func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, v o.informers = append(o.informers, informer) - informer.AddEventHandler(newResourceEventHandler(o, group, version, kind)) + informer.AddEventHandler(newResourceEventHandler(o, group, version, kind, informerFilterMerge(filters...))) return nil } diff --git a/pkg/util/constants/constants.go b/pkg/util/constants/constants.go index f53d5a0ae..b0b45f70c 100644 --- a/pkg/util/constants/constants.go +++ b/pkg/util/constants/constants.go @@ -58,6 +58,11 @@ const ( FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists FinalizerDelayPodTermination = "pod.database.arangodb.com/delay" // Finalizer added to Pod, delays termination + AnnotationShutdownManagedContainer = "shutdown.arangodb.com/managed" + AnnotationShutdownContainer = "container.shutdown.arangodb.com" + AnnotationShutdownCoreContainer = "core.shutdown.arangodb.com" + AnnotationShutdownCoreContainerModeWait = "wait" + AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false" BackupLabelRole = "backup/role" diff --git a/pkg/util/context.go b/pkg/util/context.go index a4da3761d..380d1dca4 100644 --- a/pkg/util/context.go +++ b/pkg/util/context.go @@ -31,6 +31,17 @@ import ( "github.com/arangodb/kube-arangodb/pkg/util/globals" ) +func WithKubernetesContextTimeoutP1A1[P1, A1 interface{}](ctx context.Context, f func(context.Context, A1) P1, a1 A1) P1 { + return WithContextTimeoutP1A1(ctx, globals.GetGlobals().Timeouts().Kubernetes().Get(), f, a1) +} + +func WithContextTimeoutP1A1[P1, A1 interface{}](ctx context.Context, timeout time.Duration, f func(context.Context, A1) P1, a1 A1) P1 { + nCtx, c := context.WithTimeout(ctx, timeout) + defer c() + + return f(nCtx, a1) +} + func WithKubernetesContextTimeoutP1A2[P1, A1, A2 interface{}](ctx context.Context, f func(context.Context, A1, A2) P1, a1 A1, a2 A2) P1 { return WithContextTimeoutP1A2(ctx, globals.GetGlobals().Timeouts().Kubernetes().Get(), f, a1, a2) } diff --git a/pkg/util/shutdown/grpc.go b/pkg/util/shutdown/grpc.go new file mode 100644 index 000000000..e41c10438 --- /dev/null +++ b/pkg/util/shutdown/grpc.go @@ -0,0 +1,75 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package shutdown + +import ( + "context" + "time" + + "google.golang.org/grpc" + + "github.com/arangodb/kube-arangodb/pkg/api/server" + pbShutdown "github.com/arangodb/kube-arangodb/pkg/api/shutdown/v1" +) + +func RegisterCentral(pb grpc.ServiceRegistrar) { + pbShutdown.RegisterShutdownServer(pb, NewShutdownableShutdownCentralServer()) +} + +func Register(pb grpc.ServiceRegistrar, closer context.CancelFunc) { + pbShutdown.RegisterShutdownServer(pb, NewShutdownableShutdownServer(closer)) +} + +func NewShutdownableShutdownCentralServer() ShutdownableShutdownServer { + return NewShutdownableShutdownServer(stop) +} + +func NewShutdownableShutdownServer(closer context.CancelFunc) ShutdownableShutdownServer { + return &impl{closer: closer} +} + +type ShutdownableShutdownServer interface { + pbShutdown.ShutdownServer + + Shutdown(cancelFunc context.CancelFunc) +} + +var _ ShutdownableShutdownServer = &impl{} + +type impl struct { + pbShutdown.UnimplementedShutdownServer + + closer context.CancelFunc +} + +func (i *impl) ShutdownServer(ctx context.Context, empty *server.Empty) (*server.Empty, error) { + go func() { + defer i.closer() + + time.Sleep(50 * time.Millisecond) + }() + + return &server.Empty{}, nil +} + +func (i *impl) Shutdown(cancelFunc context.CancelFunc) { + cancelFunc() +} diff --git a/pkg/util/shutdown/server.go b/pkg/util/shutdown/server.go new file mode 100644 index 000000000..a311a7d76 --- /dev/null +++ b/pkg/util/shutdown/server.go @@ -0,0 +1,89 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package shutdown + +import ( + "context" + "net" + + "google.golang.org/grpc" + + "github.com/arangodb/kube-arangodb/pkg/util/svc" +) + +type ServiceConfig struct { + ListenAddress string +} + +func ServiceCentral(config ServiceConfig) svc.Service { + server := grpc.NewServer( /* currently no auth parameters required */ ) + + RegisterCentral(server) + + return &service{ + cfg: config, + grpcServer: server, + } +} + +func Service(config ServiceConfig, closer context.CancelFunc) svc.Service { + server := grpc.NewServer( /* currently no auth parameters required */ ) + + Register(server, closer) + + return &service{ + cfg: config, + grpcServer: server, + } +} + +type service struct { + grpcServer *grpc.Server + cfg ServiceConfig +} + +func (s *service) Run(ctx context.Context) error { + ln, err := net.Listen("tcp", s.cfg.ListenAddress) + if err != nil { + return err + } + defer ln.Close() + + errChan := make(chan error) + go func() { + if serveErr := s.grpcServer.Serve(ln); serveErr != nil && serveErr != grpc.ErrServerStopped { + errChan <- serveErr + } + }() + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + select { + case <-ctx.Done(): + s.grpcServer.GracefulStop() + case err = <-errChan: + s.grpcServer.Stop() + close(errChan) + } + + return err +} diff --git a/pkg/util/shutdown/shutdown.go b/pkg/util/shutdown/shutdown.go index 6ed28944c..957a5766f 100644 --- a/pkg/util/shutdown/shutdown.go +++ b/pkg/util/shutdown/shutdown.go @@ -1,7 +1,7 @@ // // DISCLAIMER // -// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany +// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -21,26 +21,38 @@ package shutdown import ( + "context" "os" "os/signal" "syscall" ) func init() { - shutdown = make(chan struct{}) + ctx, stop = context.WithCancel(context.Background()) sigChannel := make(chan os.Signal, 2) signal.Notify(sigChannel, os.Interrupt, syscall.SIGTERM) go func() { - defer close(shutdown) + defer stop() <-sigChannel }() } -var shutdown chan struct{} +var ( + ctx context.Context + stop context.CancelFunc +) + +func Context() context.Context { + return ctx +} + +func Stop() { + defer stop() +} func Channel() <-chan struct{} { - return shutdown + return ctx.Done() } diff --git a/pkg/util/svc/service.go b/pkg/util/svc/service.go new file mode 100644 index 000000000..5471815b3 --- /dev/null +++ b/pkg/util/svc/service.go @@ -0,0 +1,57 @@ +// +// DISCLAIMER +// +// Copyright 2023 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package svc + +import ( + "context" + "sync" + + "github.com/arangodb/kube-arangodb/pkg/apis/shared" +) + +type Service interface { + Run(ctx context.Context) error +} + +func RunServices(ctx context.Context, services ...Service) error { + if len(services) == 0 { + <-ctx.Done() + return nil + } + + errors := make([]error, len(services)) + + var wg sync.WaitGroup + + for id := range services { + wg.Add(1) + + go func(id int) { + defer wg.Done() + + errors[id] = services[id].Run(ctx) + }(id) + } + + wg.Wait() + + return shared.WithErrors(errors...) +}