1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] [ML] Shutdown Handler (#1529)

This commit is contained in:
Adam Janikowski 2023-12-08 10:39:19 +01:00 committed by GitHub
parent 244c362d3b
commit b2c88f6ad8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 641 additions and 50 deletions

View file

@ -30,6 +30,7 @@
- (Improvement) (ML) Switch to fsnotify for file watching for MacOS support
- (Feature) (ML) Unify Images, Resources and Lifecycle
- (Improvement) (ML) CronJob status update
- (Improvement) (ML) Job Sidecar Shutdown
## [1.2.35](https://github.com/arangodb/kube-arangodb/tree/1.2.35) (2023-11-06)
- (Maintenance) Update go-driver to v1.6.0, update IsNotFound() checks

View file

@ -21,14 +21,14 @@
package cmd
import (
"context"
"os"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
"github.com/arangodb/kube-arangodb/pkg/ml/storage"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
)
var (
@ -48,6 +48,10 @@ var (
cmdMLStorageS3Options struct {
storage.ServiceConfig
}
cmdMLShutdownOptions struct {
shutdown.ServiceConfig
}
)
func init() {
@ -55,6 +59,7 @@ func init() {
cmdMLStorage.AddCommand(cmdMLStorageS3)
f := cmdMLStorageS3.PersistentFlags()
f.StringVar(&cmdMLShutdownOptions.ListenAddress, "shutdown.address", "", "Address the GRPC shutdown service will listen on (IP:port)")
f.StringVar(&cmdMLStorageS3Options.ListenAddress, "server.address", "", "Address the GRPC service will listen on (IP:port)")
f.StringVar(&cmdMLStorageS3Options.S3.Endpoint, "s3.endpoint", "", "Endpoint of S3 API implementation")
@ -76,12 +81,10 @@ func cmdMLStorageS3Run(cmd *cobra.Command, _ []string) {
}
func cmdMLStorageS3RunE(_ *cobra.Command) error {
ctx := util.CreateSignalContext(context.Background())
svc, err := storage.NewService(ctx, storage.StorageTypeS3Proxy, cmdMLStorageS3Options.ServiceConfig)
service, err := storage.NewService(shutdown.Context(), storage.StorageTypeS3Proxy, cmdMLStorageS3Options.ServiceConfig)
if err != nil {
return err
}
return svc.Run(ctx)
return svc.RunServices(shutdown.Context(), service, shutdown.ServiceCentral(cmdMLShutdownOptions.ServiceConfig))
}

View file

@ -143,6 +143,16 @@ Resources holds resource requests & limits for container
Links:
* [Documentation of core.ResourceRequirements](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#resourcerequirements-v1-core)
***
### .spec.mode.sidecar.shutdownListenPort
Type: `integer` <sup>[\[ref\]](https://github.com/arangodb/kube-arangodb/blob/1.2.35/pkg/apis/ml/v1alpha1/storage_spec_mode_sidecar.go#L36)</sup>
ShutdownListenPort defines on which port the sidecar container will be listening for shutdown connections
Default Value: `9202`
## Status
### .status.conditions

96
pkg/api/shutdown/v1/operator.pb.go generated Normal file
View file

@ -0,0 +1,96 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.1
// source: pkg/api/shutdown/v1/operator.proto
package shutdown
import (
server "github.com/arangodb/kube-arangodb/pkg/api/server"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
var File_pkg_api_shutdown_v1_operator_proto protoreflect.FileDescriptor
var file_pkg_api_shutdown_v1_operator_proto_rawDesc = []byte{
0x0a, 0x22, 0x70, 0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f,
0x77, 0x6e, 0x2f, 0x76, 0x31, 0x2f, 0x6f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70,
0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x1a, 0x1d, 0x70, 0x6b,
0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x2f, 0x6f, 0x70, 0x65,
0x72, 0x61, 0x74, 0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x32, 0x3c, 0x0a, 0x08, 0x53,
0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x12, 0x30, 0x0a, 0x0e, 0x53, 0x68, 0x75, 0x74, 0x64,
0x6f, 0x77, 0x6e, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x12, 0x0d, 0x2e, 0x73, 0x65, 0x72, 0x76,
0x65, 0x72, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x0d, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x65,
0x72, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x42, 0x34, 0x5a, 0x32, 0x67, 0x69, 0x74,
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62,
0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64, 0x62, 0x2f, 0x70,
0x6b, 0x67, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x62,
0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var file_pkg_api_shutdown_v1_operator_proto_goTypes = []interface{}{
(*server.Empty)(nil), // 0: server.Empty
}
var file_pkg_api_shutdown_v1_operator_proto_depIdxs = []int32{
0, // 0: server.Shutdown.ShutdownServer:input_type -> server.Empty
0, // 1: server.Shutdown.ShutdownServer:output_type -> server.Empty
1, // [1:2] is the sub-list for method output_type
0, // [0:1] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_pkg_api_shutdown_v1_operator_proto_init() }
func file_pkg_api_shutdown_v1_operator_proto_init() {
if File_pkg_api_shutdown_v1_operator_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_pkg_api_shutdown_v1_operator_proto_rawDesc,
NumEnums: 0,
NumMessages: 0,
NumExtensions: 0,
NumServices: 1,
},
GoTypes: file_pkg_api_shutdown_v1_operator_proto_goTypes,
DependencyIndexes: file_pkg_api_shutdown_v1_operator_proto_depIdxs,
}.Build()
File_pkg_api_shutdown_v1_operator_proto = out.File
file_pkg_api_shutdown_v1_operator_proto_rawDesc = nil
file_pkg_api_shutdown_v1_operator_proto_goTypes = nil
file_pkg_api_shutdown_v1_operator_proto_depIdxs = nil
}

31
pkg/api/shutdown/v1/operator.proto generated Normal file
View file

@ -0,0 +1,31 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
syntax = "proto3";
option go_package = "github.com/arangodb/kube-arangodb/pkg/api/shutdown";
package server;
import "pkg/api/server/operator.proto";
service Shutdown {
rpc ShutdownServer (Empty) returns (Empty) {}
}

106
pkg/api/shutdown/v1/operator_grpc.pb.go generated Normal file
View file

@ -0,0 +1,106 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc v3.21.1
// source: pkg/api/shutdown/v1/operator.proto
package shutdown
import (
context "context"
server "github.com/arangodb/kube-arangodb/pkg/api/server"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// ShutdownClient is the client API for Shutdown service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ShutdownClient interface {
ShutdownServer(ctx context.Context, in *server.Empty, opts ...grpc.CallOption) (*server.Empty, error)
}
type shutdownClient struct {
cc grpc.ClientConnInterface
}
func NewShutdownClient(cc grpc.ClientConnInterface) ShutdownClient {
return &shutdownClient{cc}
}
func (c *shutdownClient) ShutdownServer(ctx context.Context, in *server.Empty, opts ...grpc.CallOption) (*server.Empty, error) {
out := new(server.Empty)
err := c.cc.Invoke(ctx, "/server.Shutdown/ShutdownServer", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ShutdownServer is the server API for Shutdown service.
// All implementations must embed UnimplementedShutdownServer
// for forward compatibility
type ShutdownServer interface {
ShutdownServer(context.Context, *server.Empty) (*server.Empty, error)
mustEmbedUnimplementedShutdownServer()
}
// UnimplementedShutdownServer must be embedded to have forward compatible implementations.
type UnimplementedShutdownServer struct {
}
func (UnimplementedShutdownServer) ShutdownServer(context.Context, *server.Empty) (*server.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method ShutdownServer not implemented")
}
func (UnimplementedShutdownServer) mustEmbedUnimplementedShutdownServer() {}
// UnsafeShutdownServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ShutdownServer will
// result in compilation errors.
type UnsafeShutdownServer interface {
mustEmbedUnimplementedShutdownServer()
}
func RegisterShutdownServer(s grpc.ServiceRegistrar, srv ShutdownServer) {
s.RegisterService(&Shutdown_ServiceDesc, srv)
}
func _Shutdown_ShutdownServer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(server.Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ShutdownServer).ShutdownServer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/server.Shutdown/ShutdownServer",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ShutdownServer).ShutdownServer(ctx, req.(*server.Empty))
}
return interceptor(ctx, in, info, handler)
}
// Shutdown_ServiceDesc is the grpc.ServiceDesc for Shutdown service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var Shutdown_ServiceDesc = grpc.ServiceDesc{
ServiceName: "server.Shutdown",
HandlerType: (*ShutdownServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "ShutdownServer",
Handler: _Shutdown_ShutdownServer_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "pkg/api/shutdown/v1/operator.proto",
}

View file

@ -107,9 +107,7 @@ func (s *ArangoMLExtensionSpecDeployment) Validate() error {
}
errs := []error{
shared.PrefixResourceErrors("service", shared.ValidateOptional(s.GetService(), func(service ArangoMLExtensionSpecDeploymentService) error {
return service.Validate()
})),
shared.PrefixResourceErrors("service", shared.ValidateOptional(s.GetService(), func(s ArangoMLExtensionSpecDeploymentService) error { return s.Validate() })),
}
if s.GetReplicas() < 0 || s.GetReplicas() > 10 {

View file

@ -31,6 +31,10 @@ type ArangoMLStorageSpecModeSidecar struct {
// +doc/default: 9201
ListenPort *uint16 `json:"listenPort,omitempty"`
// ShutdownListenPort defines on which port the sidecar container will be listening for shutdown connections
// +doc/default: 9202
ShutdownListenPort *uint16 `json:"shutdownListenPort,omitempty"`
// Image define default image used for the extension
*sharedApi.Image `json:",inline"`
@ -65,6 +69,10 @@ func (s *ArangoMLStorageSpecModeSidecar) Validate() error {
err = append(err, shared.PrefixResourceErrors("listenPort", errors.Newf("must be positive")))
}
if s.GetShutdownListenPort() < 1 {
err = append(err, shared.PrefixResourceErrors("shutdownListenPort", errors.Newf("must be positive")))
}
err = append(err, s.GetResources().Validate())
return shared.WithErrors(err...)
@ -76,3 +84,10 @@ func (s *ArangoMLStorageSpecModeSidecar) GetListenPort() uint16 {
}
return *s.ListenPort
}
func (s *ArangoMLStorageSpecModeSidecar) GetShutdownListenPort() uint16 {
if s == nil || s.ShutdownListenPort == nil {
return 9202
}
return *s.ShutdownListenPort
}

View file

@ -774,6 +774,11 @@ func (in *ArangoMLStorageSpecModeSidecar) DeepCopyInto(out *ArangoMLStorageSpecM
*out = new(uint16)
**out = **in
}
if in.ShutdownListenPort != nil {
in, out := &in.ShutdownListenPort, &out.ShutdownListenPort
*out = new(uint16)
**out = **in
}
if in.Image != nil {
in, out := &in.Image, &out.Image
*out = new(sharedv1.Image)

View file

@ -38,6 +38,14 @@ type Image struct {
PullSecrets []string `json:"pullSecrets,omitempty"`
}
func (i *Image) GetImage() string {
if i == nil || i.Image == nil {
return ""
}
return *i.Image
}
func (i *Image) Validate() error {
if i == nil {
return nil

View file

@ -76,6 +76,21 @@ func ValidatePullPolicy(in core.PullPolicy) error {
return errors.Newf("Unknown pull policy: '%s'", string(in))
}
type ValidateInterface interface {
Validate() error
}
func Validate[T interface{}](in T) error {
return validate(in)
}
func validate(in any) error {
if v, ok := in.(ValidateInterface); ok {
return v.Validate()
}
return nil
}
// ValidateOptional Validates object if is not nil
func ValidateOptional[T interface{}](in *T, validator func(T) error) error {
if in != nil {

View file

@ -95,6 +95,10 @@ v1alpha1:
type: string
type: object
type: object
shutdownListenPort:
description: ShutdownListenPort defines on which port the sidecar container will be listening for shutdown connections
format: int32
type: integer
type: object
type: object
type: object

View file

@ -25,8 +25,10 @@ package storage
import (
"context"
"errors"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
)
func NewService(_ context.Context, _ StorageType, _ ServiceConfig) (Service, error) {
func NewService(_ context.Context, _ StorageType, _ ServiceConfig) (svc.Service, error) {
return nil, errors.New("this service is available only in enterprise edition of operator")
}

View file

@ -21,8 +21,6 @@
package storage
import (
"context"
"github.com/arangodb/kube-arangodb/pkg/ml/storage/s3"
)
@ -36,7 +34,3 @@ type ServiceConfig struct {
ListenAddress string
S3 s3.Config
}
type Service interface {
Run(ctx context.Context) error
}

View file

@ -23,6 +23,7 @@
package operator
import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
arangoClientSet "github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
@ -36,6 +37,6 @@ func (o *Operator) onStartML(stop <-chan struct{}) {
panic("Unable to start ML Operator in Community")
}
func (o *Operator) onStartOperatorV2ML(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory) {
func (o *Operator) onStartOperatorV2ML(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) {
panic("Unable to start ML Operator in Community")
}

View file

@ -29,8 +29,8 @@ import (
"github.com/rs/zerolog"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
kwatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"github.com/arangodb/kube-arangodb/pkg/apis/apps"
@ -261,38 +261,29 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st
zerolog.SetGlobalLevel(zerolog.DebugLevel)
restClient, err := rest.InClusterConfig()
if err != nil {
panic(err)
}
eventRecorder := event.NewEventRecorder(operatorName, o.Client.Kubernetes())
arangoClientSet, err := arangoClientSet.NewForConfig(restClient)
if err != nil {
panic(err)
}
arangoInformer := arangoInformer.NewSharedInformerFactoryWithOptions(o.Client.Arango(), 10*time.Second, arangoInformer.WithNamespace(o.Namespace))
kubeClientSet, err := kubernetes.NewForConfig(restClient)
if err != nil {
panic(err)
}
eventRecorder := event.NewEventRecorder(operatorName, kubeClientSet)
arangoInformer := arangoInformer.NewSharedInformerFactoryWithOptions(arangoClientSet, 10*time.Second, arangoInformer.WithNamespace(o.Namespace))
kubeInformer := informers.NewSharedInformerFactoryWithOptions(o.Client.Kubernetes(), 15*time.Second, informers.WithNamespace(o.Namespace))
switch operatorType {
case appsOperator:
o.onStartOperatorV2Apps(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer)
o.onStartOperatorV2Apps(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer)
o.Dependencies.AppsProbe.SetReady()
case backupOperator:
o.onStartOperatorV2Backup(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer)
o.onStartOperatorV2Backup(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer)
o.Dependencies.BackupProbe.SetReady()
case mlOperator:
o.onStartOperatorV2ML(operator, eventRecorder, arangoClientSet, kubeClientSet, arangoInformer)
o.onStartOperatorV2ML(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer)
o.Dependencies.MlProbe.SetReady()
}
if err = operator.RegisterStarter(arangoInformer); err != nil {
if err := operator.RegisterStarter(arangoInformer); err != nil {
panic(err)
}
if err := operator.RegisterStarter(kubeInformer); err != nil {
panic(err)
}

42
pkg/operatorV2/filter.go Normal file
View file

@ -0,0 +1,42 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package operator
import meta "k8s.io/apimachinery/pkg/apis/meta/v1"
type InformerFilter func(obj meta.Object) bool
func informerFilterMerge(filters ...InformerFilter) InformerFilter {
if len(filters) == 0 {
return func(obj meta.Object) bool {
return true
}
}
return func(obj meta.Object) bool {
for _, filter := range filters {
if !filter(obj) {
return false
}
}
return true
}
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -27,12 +27,13 @@ import (
"github.com/arangodb/kube-arangodb/pkg/operatorV2/operation"
)
func newResourceEventHandler(operator Operator, group, version, kind string) cache.ResourceEventHandler {
func newResourceEventHandler(operator Operator, group, version, kind string, filter InformerFilter) cache.ResourceEventHandler {
return &resourceEventWrapper{
Operator: operator,
Group: group,
Version: version,
Kind: kind,
filter: filter,
}
}
@ -40,6 +41,8 @@ type resourceEventWrapper struct {
Operator Operator
Group, Version, Kind string
filter InformerFilter
}
func (r *resourceEventWrapper) push(o operation.Operation, obj interface{}) {
@ -48,7 +51,13 @@ func (r *resourceEventWrapper) push(o operation.Operation, obj interface{}) {
}
if object, ok := obj.(meta.Object); ok {
if item, err := operation.NewItemFromObject(o, r.Group, r.Version, r.Kind, object); err == nil {
if f := r.filter; f != nil {
if !f(object) {
return
}
}
r.Operator.EnqueueItem(item)
}
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -23,6 +23,7 @@ package operation
import (
"strings"
"github.com/rs/zerolog"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
@ -132,3 +133,13 @@ func (i Item) Validate() error {
func (i Item) String() string {
return strings.Join([]string{string(i.Operation), i.Group, i.Version, i.Kind, i.Namespace, i.Name}, separator)
}
func (i Item) WrapLogger(in *zerolog.Event) *zerolog.Event {
return in.
Str("operation", string(i.Operation)).
Str("namespace", i.Namespace).
Str("name", i.Name).
Str("group", i.Group).
Str("version", i.Version).
Str("kind", i.Kind)
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -49,7 +49,7 @@ type Operator interface {
Start(threadiness int, stopCh <-chan struct{}) error
RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string) error
RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string, filters ...InformerFilter) error
RegisterStarter(starter Starter) error
RegisterHandler(handler Handler) error
@ -158,7 +158,7 @@ func (o *operator) EnqueueItem(item operation.Item) {
o.workqueue.Add(item.String())
}
func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string) error {
func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, version, kind string, filters ...InformerFilter) error {
o.lock.Lock()
defer o.lock.Unlock()
@ -174,7 +174,7 @@ func (o *operator) RegisterInformer(informer cache.SharedIndexInformer, group, v
o.informers = append(o.informers, informer)
informer.AddEventHandler(newResourceEventHandler(o, group, version, kind))
informer.AddEventHandler(newResourceEventHandler(o, group, version, kind, informerFilterMerge(filters...)))
return nil
}

View file

@ -58,6 +58,11 @@ const (
FinalizerPVCMemberExists = "pvc.database.arangodb.com/member-exists" // Finalizer added to PVCs, indicating the need to keep is as long as its member exists
FinalizerDelayPodTermination = "pod.database.arangodb.com/delay" // Finalizer added to Pod, delays termination
AnnotationShutdownManagedContainer = "shutdown.arangodb.com/managed"
AnnotationShutdownContainer = "container.shutdown.arangodb.com"
AnnotationShutdownCoreContainer = "core.shutdown.arangodb.com"
AnnotationShutdownCoreContainerModeWait = "wait"
AnnotationEnforceAntiAffinity = "database.arangodb.com/enforce-anti-affinity" // Key of annotation added to PVC. Value is a boolean "true" or "false"
BackupLabelRole = "backup/role"

View file

@ -31,6 +31,17 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/globals"
)
func WithKubernetesContextTimeoutP1A1[P1, A1 interface{}](ctx context.Context, f func(context.Context, A1) P1, a1 A1) P1 {
return WithContextTimeoutP1A1(ctx, globals.GetGlobals().Timeouts().Kubernetes().Get(), f, a1)
}
func WithContextTimeoutP1A1[P1, A1 interface{}](ctx context.Context, timeout time.Duration, f func(context.Context, A1) P1, a1 A1) P1 {
nCtx, c := context.WithTimeout(ctx, timeout)
defer c()
return f(nCtx, a1)
}
func WithKubernetesContextTimeoutP1A2[P1, A1, A2 interface{}](ctx context.Context, f func(context.Context, A1, A2) P1, a1 A1, a2 A2) P1 {
return WithContextTimeoutP1A2(ctx, globals.GetGlobals().Timeouts().Kubernetes().Get(), f, a1, a2)
}

75
pkg/util/shutdown/grpc.go Normal file
View file

@ -0,0 +1,75 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package shutdown
import (
"context"
"time"
"google.golang.org/grpc"
"github.com/arangodb/kube-arangodb/pkg/api/server"
pbShutdown "github.com/arangodb/kube-arangodb/pkg/api/shutdown/v1"
)
func RegisterCentral(pb grpc.ServiceRegistrar) {
pbShutdown.RegisterShutdownServer(pb, NewShutdownableShutdownCentralServer())
}
func Register(pb grpc.ServiceRegistrar, closer context.CancelFunc) {
pbShutdown.RegisterShutdownServer(pb, NewShutdownableShutdownServer(closer))
}
func NewShutdownableShutdownCentralServer() ShutdownableShutdownServer {
return NewShutdownableShutdownServer(stop)
}
func NewShutdownableShutdownServer(closer context.CancelFunc) ShutdownableShutdownServer {
return &impl{closer: closer}
}
type ShutdownableShutdownServer interface {
pbShutdown.ShutdownServer
Shutdown(cancelFunc context.CancelFunc)
}
var _ ShutdownableShutdownServer = &impl{}
type impl struct {
pbShutdown.UnimplementedShutdownServer
closer context.CancelFunc
}
func (i *impl) ShutdownServer(ctx context.Context, empty *server.Empty) (*server.Empty, error) {
go func() {
defer i.closer()
time.Sleep(50 * time.Millisecond)
}()
return &server.Empty{}, nil
}
func (i *impl) Shutdown(cancelFunc context.CancelFunc) {
cancelFunc()
}

View file

@ -0,0 +1,89 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package shutdown
import (
"context"
"net"
"google.golang.org/grpc"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
)
type ServiceConfig struct {
ListenAddress string
}
func ServiceCentral(config ServiceConfig) svc.Service {
server := grpc.NewServer( /* currently no auth parameters required */ )
RegisterCentral(server)
return &service{
cfg: config,
grpcServer: server,
}
}
func Service(config ServiceConfig, closer context.CancelFunc) svc.Service {
server := grpc.NewServer( /* currently no auth parameters required */ )
Register(server, closer)
return &service{
cfg: config,
grpcServer: server,
}
}
type service struct {
grpcServer *grpc.Server
cfg ServiceConfig
}
func (s *service) Run(ctx context.Context) error {
ln, err := net.Listen("tcp", s.cfg.ListenAddress)
if err != nil {
return err
}
defer ln.Close()
errChan := make(chan error)
go func() {
if serveErr := s.grpcServer.Serve(ln); serveErr != nil && serveErr != grpc.ErrServerStopped {
errChan <- serveErr
}
}()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
select {
case <-ctx.Done():
s.grpcServer.GracefulStop()
case err = <-errChan:
s.grpcServer.Stop()
close(errChan)
}
return err
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -21,26 +21,38 @@
package shutdown
import (
"context"
"os"
"os/signal"
"syscall"
)
func init() {
shutdown = make(chan struct{})
ctx, stop = context.WithCancel(context.Background())
sigChannel := make(chan os.Signal, 2)
signal.Notify(sigChannel, os.Interrupt, syscall.SIGTERM)
go func() {
defer close(shutdown)
defer stop()
<-sigChannel
}()
}
var shutdown chan struct{}
var (
ctx context.Context
stop context.CancelFunc
)
func Context() context.Context {
return ctx
}
func Stop() {
defer stop()
}
func Channel() <-chan struct{} {
return shutdown
return ctx.Done()
}

57
pkg/util/svc/service.go Normal file
View file

@ -0,0 +1,57 @@
//
// DISCLAIMER
//
// Copyright 2023 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package svc
import (
"context"
"sync"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
)
type Service interface {
Run(ctx context.Context) error
}
func RunServices(ctx context.Context, services ...Service) error {
if len(services) == 0 {
<-ctx.Done()
return nil
}
errors := make([]error, len(services))
var wg sync.WaitGroup
for id := range services {
wg.Add(1)
go func(id int) {
defer wg.Done()
errors[id] = services[id].Run(ctx)
}(id)
}
wg.Wait()
return shared.WithErrors(errors...)
}