diff --git a/CHANGELOG.md b/CHANGELOG.md index a24e273d9..efc7942b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ - (Feature) (Platform) Platform Requirements support - (Improvement) Drop slash requirement from ArangoRoute - (Feature) (Networking) Pass through Server Header +- (Feature) (Platform) Shutdown migration to CE ## [1.2.43](https://github.com/arangodb/kube-arangodb/tree/1.2.43) (2024-10-14) - (Feature) ArangoRoute CRD diff --git a/README.md b/README.md index 4de555f57..7819a885b 100644 --- a/README.md +++ b/README.md @@ -195,7 +195,7 @@ Flags: --kubernetes.max-batch-size int Size of batch during objects read (default 256) --kubernetes.qps float32 Number of queries per second for k8s API (default 15) --log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty") - --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info]) + --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info]) --log.sampling If true, operator will try to minimize duplication of logging events (default true) --memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing --metrics.excluded-prefixes stringArray List of the excluded metrics prefixes diff --git a/docs/cli/arangodb_operator.md b/docs/cli/arangodb_operator.md index 94f765a0f..38e4d48b8 100644 --- a/docs/cli/arangodb_operator.md +++ b/docs/cli/arangodb_operator.md @@ -80,7 +80,7 @@ Flags: --kubernetes.max-batch-size int Size of batch during objects read (default 256) --kubernetes.qps float32 Number of queries per second for k8s API (default 15) --log.format string Set log format. Allowed values: 'pretty', 'JSON'. If empty, default format is used (default "pretty") - --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info]) + --log.level stringArray Set log levels in format or =. Possible loggers: action, agency, api-server, assertion, backup-operator, chaos-monkey, crd, deployment, deployment-ci, deployment-reconcile, deployment-replication, deployment-resilience, deployment-resources, deployment-storage, deployment-storage-pc, deployment-storage-service, generic-parent-operator, helm, http, inspector, integration-config-v1, integration-envoy-auth-v3, integration-scheduler-v2, integration-storage-v2, integrations, k8s-client, kubernetes-informer, monitor, networking-route-operator, operator, operator-arangojob-handler, operator-v2, operator-v2-event, operator-v2-worker, panics, platform-chart-operator, platform-pod-shutdown, platform-storage-operator, pod_compare, root, root-event-recorder, scheduler-batchjob-operator, scheduler-cronjob-operator, scheduler-deployment-operator, scheduler-pod-operator, scheduler-profile-operator, server, server-authentication (default [info]) --log.sampling If true, operator will try to minimize duplication of logging events (default true) --memory-limit uint Define memory limit for hard shutdown and the dump of goroutines. Used for testing --metrics.excluded-prefixes stringArray List of the excluded metrics prefixes diff --git a/pkg/handlers/platform/shutdown/handler.go b/pkg/handlers/platform/shutdown/handler.go new file mode 100644 index 000000000..19f8fefd9 --- /dev/null +++ b/pkg/handlers/platform/shutdown/handler.go @@ -0,0 +1,170 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package shutdown + +import ( + "context" + "fmt" + "strconv" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + core "k8s.io/api/core/v1" + apiErrors "k8s.io/apimachinery/pkg/api/errors" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + pbSharedV1 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition" + pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition" + "github.com/arangodb/kube-arangodb/pkg/logging" + operator "github.com/arangodb/kube-arangodb/pkg/operatorV2" + "github.com/arangodb/kube-arangodb/pkg/operatorV2/event" + "github.com/arangodb/kube-arangodb/pkg/operatorV2/operation" + "github.com/arangodb/kube-arangodb/pkg/util" + "github.com/arangodb/kube-arangodb/pkg/util/constants" +) + +var logger = logging.Global().RegisterAndGetLogger("platform-pod-shutdown", logging.Info) + +type handler struct { + kubeClient kubernetes.Interface + + eventRecorder event.RecorderInstance + + operator operator.Operator +} + +func (h *handler) Name() string { + return Kind() +} + +func (h *handler) Handle(ctx context.Context, item operation.Item) error { + pod, err := util.WithKubernetesContextTimeoutP2A2(ctx, h.kubeClient.CoreV1().Pods(item.Namespace).Get, item.Name, meta.GetOptions{}) + if err != nil { + if apiErrors.IsNotFound(err) { + return nil + } + + return err + } + + // If not annotated, stop execution + if _, ok := pod.Annotations[constants.AnnotationShutdownManagedContainer]; !ok { + return nil + } + + for _, container := range pod.Status.ContainerStatuses { + v, ok := pod.Annotations[fmt.Sprintf("%s/%s", constants.AnnotationShutdownCoreContainer, container.Name)] + if !ok { + continue + } + + switch v { + case constants.AnnotationShutdownCoreContainerModeWait: + if container.State.Terminated == nil { + // Container is not yet stopped, skip shutdown + return nil + } + } + } + + // All containers, which are expected to shutdown, are down + + for _, container := range pod.Status.ContainerStatuses { + v, ok := pod.Annotations[fmt.Sprintf("%s/%s", constants.AnnotationShutdownContainer, container.Name)] + if !ok { + continue + } + + // We did not reach running state, nothing to do + if container.State.Running == nil { + continue + } + + port, ok := h.getContainerPort(pod.Spec.Containers, container.Name, v) + if !ok { + // We did not find port, continue + continue + } + + if port.ContainerPort == 0 { + continue + } + + if pod.Status.PodIP == "" { + continue + } + + if err := util.WithKubernetesContextTimeoutP1A1(ctx, h.invokeShutdown, fmt.Sprintf("%s:%d", pod.Status.PodIP, port.ContainerPort)); err != nil { + logger.WrapObj(item).Err(err).Str("container", container.Name).Debug("Unable to send shutdown request") + } + + logger.WrapObj(item).Str("container", container.Name).Debug("Shutdown request sent") + } + + // Always return nil + return nil +} + +func (h *handler) CanBeHandled(item operation.Item) bool { + return item.Group == Group() && + item.Version == Version() && + item.Kind == Kind() +} + +func (h *handler) getContainerPort(containers []core.Container, container, port string) (core.ContainerPort, bool) { + if v, err := strconv.Atoi(port); err == nil { + return core.ContainerPort{ + ContainerPort: int32(v), + }, true + } + + for _, c := range containers { + if c.Name != container { + continue + } + + for _, p := range c.Ports { + if p.Name == port { + return p, true + } + } + } + + return core.ContainerPort{}, false +} + +func (h *handler) invokeShutdown(ctx context.Context, addr string) error { + conn, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + + defer conn.Close() + + client := pbShutdownV1.NewShutdownV1Client(conn) + + if _, err := client.Shutdown(ctx, &pbSharedV1.Empty{}); err != nil { + return err + } + + return nil +} diff --git a/pkg/handlers/platform/shutdown/local.go b/pkg/handlers/platform/shutdown/local.go new file mode 100644 index 000000000..7544cff02 --- /dev/null +++ b/pkg/handlers/platform/shutdown/local.go @@ -0,0 +1,33 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package shutdown + +func Kind() string { + return "Pod" +} + +func Group() string { + return "" +} + +func Version() string { + return "v1" +} diff --git a/pkg/handlers/platform/shutdown/register.go b/pkg/handlers/platform/shutdown/register.go new file mode 100644 index 000000000..6169dc0bc --- /dev/null +++ b/pkg/handlers/platform/shutdown/register.go @@ -0,0 +1,62 @@ +// +// DISCLAIMER +// +// Copyright 2024 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// + +package shutdown + +import ( + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + + operator "github.com/arangodb/kube-arangodb/pkg/operatorV2" + "github.com/arangodb/kube-arangodb/pkg/operatorV2/event" + "github.com/arangodb/kube-arangodb/pkg/util/constants" +) + +// RegisterInformer into operator +func RegisterInformer(operator operator.Operator, recorder event.Recorder, kubeClient kubernetes.Interface, informer informers.SharedInformerFactory) error { + if err := operator.RegisterInformer(informer.Core().V1().Pods().Informer(), + Group(), + Version(), + Kind(), func(obj meta.Object) bool { + if anns := obj.GetAnnotations(); len(anns) != 0 { + if _, ok := anns[constants.AnnotationShutdownManagedContainer]; ok { + return true + } + } + return false + }); err != nil { + return err + } + + h := &handler{ + kubeClient: kubeClient, + + eventRecorder: recorder.NewInstance(Group(), Version(), Kind()), + + operator: operator, + } + + if err := operator.RegisterHandler(h); err != nil { + return err + } + + return nil +} diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 9434e90d2..5c2ed7677 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -50,6 +50,7 @@ import ( "github.com/arangodb/kube-arangodb/pkg/handlers/job" "github.com/arangodb/kube-arangodb/pkg/handlers/networking/route" platformChart "github.com/arangodb/kube-arangodb/pkg/handlers/platform/chart" + platformShutdown "github.com/arangodb/kube-arangodb/pkg/handlers/platform/shutdown" platformStorage "github.com/arangodb/kube-arangodb/pkg/handlers/platform/storage" "github.com/arangodb/kube-arangodb/pkg/handlers/policy" schedulerBatchJobHandler "github.com/arangodb/kube-arangodb/pkg/handlers/scheduler/batchjob" @@ -350,7 +351,7 @@ func (o *Operator) onStartOperatorV2(operatorType operatorV2type, stop <-chan st o.onStartOperatorV2Networking(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer) o.Dependencies.NetworkingProbe.SetReady() case platformOperator: - o.onStartOperatorV2Platform(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer) + o.onStartOperatorV2Platform(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer) o.Dependencies.PlatformProbe.SetReady() case schedulerOperator: o.onStartOperatorV2Scheduler(operator, eventRecorder, o.Client.Arango(), o.Client.Kubernetes(), arangoInformer, kubeInformer) @@ -398,7 +399,7 @@ func (o *Operator) onStartOperatorV2Networking(operator operatorV2.Operator, rec } } -func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory) { +func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) { checkFn := func() error { _, err := o.Client.Arango().PlatformV1alpha1().ArangoPlatformStorages(o.Namespace).List(context.Background(), meta.ListOptions{}) return err @@ -412,6 +413,10 @@ func (o *Operator) onStartOperatorV2Platform(operator operatorV2.Operator, recor if err := platformChart.RegisterInformer(operator, recorder, client, kubeClient, informer); err != nil { panic(err) } + + if err := platformShutdown.RegisterInformer(operator, recorder, kubeClient, kubeInformer); err != nil { + panic(err) + } } func (o *Operator) onStartOperatorV2Scheduler(operator operatorV2.Operator, recorder event.Recorder, client arangoClientSet.Interface, kubeClient kubernetes.Interface, informer arangoInformer.SharedInformerFactory, kubeInformer informers.SharedInformerFactory) {