2022-01-04 18:46:53 +00:00
//
// DISCLAIMER
//
2022-01-10 11:35:49 +00:00
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
2022-01-04 18:46:53 +00:00
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package cmd
import (
"context"
goflag "flag"
"fmt"
"net"
"net/http"
"os"
"strconv"
"strings"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
operatorHTTP "github.com/arangodb/kube-arangodb/pkg/util/http"
"github.com/gin-gonic/gin"
"github.com/arangodb/kube-arangodb/pkg/version"
"github.com/arangodb/kube-arangodb/pkg/operator/scope"
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
"github.com/rs/zerolog/log"
deploymentApi "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util"
utilsError "github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/pkg/errors"
"github.com/rs/zerolog"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"github.com/arangodb/kube-arangodb/pkg/client"
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned/scheme"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/operator"
"github.com/arangodb/kube-arangodb/pkg/server"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/arangodb/kube-arangodb/pkg/util/probe"
"github.com/arangodb/kube-arangodb/pkg/util/retry"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"
)
const (
defaultServerHost = "0.0.0.0"
defaultServerPort = 8528
defaultLogLevel = "debug"
defaultAdminSecretName = "arangodb-operator-dashboard"
defaultAlpineImage = "alpine:3.7"
defaultMetricsExporterImage = "arangodb/arangodb-exporter:0.1.6"
defaultArangoImage = "arangodb/arangodb:latest"
UBIImageEnv util . EnvironmentVariable = "RELATED_IMAGE_UBI"
ArangoImageEnv util . EnvironmentVariable = "RELATED_IMAGE_DATABASE"
MetricsExporterImageEnv util . EnvironmentVariable = "RELATED_IMAGE_METRICSEXPORTER"
)
var (
maskAny = errors . WithStack
cmdMain = cobra . Command {
Use : "arangodb_operator" ,
Run : executeMain ,
}
logLevels [ ] string
cliLog = logging . NewRootLogger ( )
logService logging . Service
serverOptions struct {
host string
port int
tlsSecretName string
adminSecretName string // Name of basic authentication secret containing the admin username+password of the dashboard
allowAnonymous bool // If set, anonymous access to dashboard is allowed
}
operatorOptions struct {
enableDeployment bool // Run deployment operator
enableDeploymentReplication bool // Run deployment-replication operator
enableStorage bool // Run local-storage operator
enableBackup bool // Run backup operator
enableApps bool // Run apps operator
versionOnly bool // Run only version endpoint, explicitly disabled with other
2022-01-06 20:27:01 +00:00
enableK2KClusterSync bool // Run k2kClusterSync operator
2022-01-04 18:46:53 +00:00
scalingIntegrationEnabled bool
alpineImage , metricsExporterImage , arangoImage string
singleMode bool
scope string
}
operatorKubernetesOptions struct {
maxBatchSize int64
}
operatorBackup struct {
concurrentUploads int
}
operatorTimeouts struct {
k8s time . Duration
arangoD time . Duration
2022-02-03 23:03:12 +00:00
arangoDCheck time . Duration
2022-01-04 18:46:53 +00:00
reconciliation time . Duration
}
chaosOptions struct {
allowed bool
}
livenessProbe probe . LivenessProbe
deploymentProbe probe . ReadyProbe
deploymentReplicationProbe probe . ReadyProbe
storageProbe probe . ReadyProbe
backupProbe probe . ReadyProbe
appsProbe probe . ReadyProbe
2022-01-06 20:27:01 +00:00
k2KClusterSyncProbe probe . ReadyProbe
2022-01-04 18:46:53 +00:00
)
func init ( ) {
f := cmdMain . Flags ( )
f . StringVar ( & serverOptions . host , "server.host" , defaultServerHost , "Host to listen on" )
f . IntVar ( & serverOptions . port , "server.port" , defaultServerPort , "Port to listen on" )
f . StringVar ( & serverOptions . tlsSecretName , "server.tls-secret-name" , "" , "Name of secret containing tls.crt & tls.key for HTTPS server (if empty, self-signed certificate is used)" )
f . StringVar ( & serverOptions . adminSecretName , "server.admin-secret-name" , defaultAdminSecretName , "Name of secret containing username + password for login to the dashboard" )
f . BoolVar ( & serverOptions . allowAnonymous , "server.allow-anonymous-access" , false , "Allow anonymous access to the dashboard" )
f . StringArrayVar ( & logLevels , "log.level" , [ ] string { defaultLogLevel } , fmt . Sprintf ( "Set log levels in format <level> or <logger>=<level>. Possible loggers: %s" , strings . Join ( logging . LoggerNames ( ) , ", " ) ) )
f . BoolVar ( & operatorOptions . enableDeployment , "operator.deployment" , false , "Enable to run the ArangoDeployment operator" )
f . BoolVar ( & operatorOptions . enableDeploymentReplication , "operator.deployment-replication" , false , "Enable to run the ArangoDeploymentReplication operator" )
f . BoolVar ( & operatorOptions . enableStorage , "operator.storage" , false , "Enable to run the ArangoLocalStorage operator" )
f . BoolVar ( & operatorOptions . enableBackup , "operator.backup" , false , "Enable to run the ArangoBackup operator" )
f . BoolVar ( & operatorOptions . enableApps , "operator.apps" , false , "Enable to run the ArangoApps operator" )
2022-01-06 20:27:01 +00:00
f . BoolVar ( & operatorOptions . enableK2KClusterSync , "operator.k2k-cluster-sync" , false , "Enable to run the ArangoClusterSynchronizations operator" )
2022-01-04 18:46:53 +00:00
f . BoolVar ( & operatorOptions . versionOnly , "operator.version" , false , "Enable only version endpoint in Operator" )
f . StringVar ( & operatorOptions . alpineImage , "operator.alpine-image" , UBIImageEnv . GetOrDefault ( defaultAlpineImage ) , "Docker image used for alpine containers" )
f . MarkDeprecated ( "operator.alpine-image" , "Value is not used anymore" )
f . StringVar ( & operatorOptions . metricsExporterImage , "operator.metrics-exporter-image" , MetricsExporterImageEnv . GetOrDefault ( defaultMetricsExporterImage ) , "Docker image used for metrics containers by default" )
f . StringVar ( & operatorOptions . arangoImage , "operator.arango-image" , ArangoImageEnv . GetOrDefault ( defaultArangoImage ) , "Docker image used for arango by default" )
f . BoolVar ( & chaosOptions . allowed , "chaos.allowed" , false , "Set to allow chaos in deployments. Only activated when allowed and enabled in deployment" )
f . BoolVar ( & operatorOptions . singleMode , "mode.single" , false , "Enable single mode in Operator. WARNING: There should be only one replica of Operator, otherwise Operator can take unexpected actions" )
f . StringVar ( & operatorOptions . scope , "scope" , scope . DefaultScope . String ( ) , "Define scope on which Operator works. Legacy - pre 1.1.0 scope with limited cluster access" )
f . DurationVar ( & operatorTimeouts . k8s , "timeout.k8s" , globals . DefaultKubernetesTimeout , "The request timeout to the kubernetes" )
f . DurationVar ( & operatorTimeouts . arangoD , "timeout.arangod" , globals . DefaultArangoDTimeout , "The request timeout to the ArangoDB" )
2022-02-03 23:03:12 +00:00
f . DurationVar ( & operatorTimeouts . arangoDCheck , "timeout.arangod-check" , globals . DefaultArangoDCheckTimeout , "The version check request timeout to the ArangoDB" )
2022-01-04 18:46:53 +00:00
f . DurationVar ( & operatorTimeouts . reconciliation , "timeout.reconciliation" , globals . DefaultReconciliationTimeout , "The reconciliation timeout to the ArangoDB CR" )
f . BoolVar ( & operatorOptions . scalingIntegrationEnabled , "internal.scaling-integration" , true , "Enable Scaling Integration" )
f . Int64Var ( & operatorKubernetesOptions . maxBatchSize , "kubernetes.max-batch-size" , globals . DefaultKubernetesRequestBatchSize , "Size of batch during objects read" )
f . IntVar ( & operatorBackup . concurrentUploads , "backup-concurrent-uploads" , globals . DefaultBackupConcurrentUploads , "Number of concurrent uploads per deployment" )
features . Init ( & cmdMain )
}
func Execute ( ) int {
flag . CommandLine . AddGoFlagSet ( goflag . CommandLine )
if err := cmdMain . Execute ( ) ; err != nil {
return 1
}
return 0
}
// Show usage
func executeUsage ( cmd * cobra . Command , args [ ] string ) {
cmd . Usage ( )
}
// Run the operator
func executeMain ( cmd * cobra . Command , args [ ] string ) {
// Set global logger
log . Logger = logging . NewRootLogger ( )
// Get environment
namespace := os . Getenv ( constants . EnvOperatorPodNamespace )
name := os . Getenv ( constants . EnvOperatorPodName )
ip := os . Getenv ( constants . EnvOperatorPodIP )
deploymentApi . DefaultImage = operatorOptions . arangoImage
globals . GetGlobalTimeouts ( ) . Kubernetes ( ) . Set ( operatorTimeouts . k8s )
globals . GetGlobalTimeouts ( ) . ArangoD ( ) . Set ( operatorTimeouts . arangoD )
2022-02-03 23:03:12 +00:00
globals . GetGlobalTimeouts ( ) . ArangoDCheck ( ) . Set ( operatorTimeouts . arangoDCheck )
2022-01-04 18:46:53 +00:00
globals . GetGlobalTimeouts ( ) . Reconciliation ( ) . Set ( operatorTimeouts . reconciliation )
globals . GetGlobals ( ) . Kubernetes ( ) . RequestBatchSize ( ) . Set ( operatorKubernetesOptions . maxBatchSize )
globals . GetGlobals ( ) . Backup ( ) . ConcurrentUploads ( ) . Set ( operatorBackup . concurrentUploads )
// Prepare log service
var err error
logService , err = logging . NewService ( defaultLogLevel , logLevels )
if err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to initialize log service" )
}
logService . ConfigureRootLogger ( func ( log zerolog . Logger ) zerolog . Logger {
podNameParts := strings . Split ( name , "-" )
operatorID := podNameParts [ len ( podNameParts ) - 1 ]
cliLog = cliLog . With ( ) . Str ( "operator-id" , operatorID ) . Logger ( )
return log . With ( ) . Str ( "operator-id" , operatorID ) . Logger ( )
} )
klog . SetOutput ( logService . MustGetLogger ( logging . LoggerNameKLog ) )
klog . Info ( "nice to meet you" )
klog . Flush ( )
// Check operating mode
2022-01-06 20:27:01 +00:00
if ! operatorOptions . enableDeployment && ! operatorOptions . enableDeploymentReplication && ! operatorOptions . enableStorage &&
! operatorOptions . enableBackup && ! operatorOptions . enableApps && ! operatorOptions . enableK2KClusterSync {
2022-01-04 18:46:53 +00:00
if ! operatorOptions . versionOnly {
2022-01-06 20:27:01 +00:00
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Turn on --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup, --operator.apps, --operator.k2k-cluster-sync or any combination of these" )
2022-01-04 18:46:53 +00:00
}
} else if operatorOptions . versionOnly {
2022-01-06 20:27:01 +00:00
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Options --operator.deployment, --operator.deployment-replication, --operator.storage, --operator.backup, --operator.apps, --operator.k2k-cluster-sync cannot be enabled together with --operator.version" )
2022-01-04 18:46:53 +00:00
}
// Log version
cliLog . Info ( ) .
Str ( "pod-name" , name ) .
Str ( "pod-namespace" , namespace ) .
Msgf ( "Starting arangodb-operator (%s), version %s build %s" , version . GetVersionV1 ( ) . Edition . Title ( ) , version . GetVersionV1 ( ) . Version , version . GetVersionV1 ( ) . Build )
// Check environment
if ! operatorOptions . versionOnly {
if len ( namespace ) == 0 {
cliLog . Fatal ( ) . Msgf ( "%s environment variable missing" , constants . EnvOperatorPodNamespace )
}
if len ( name ) == 0 {
cliLog . Fatal ( ) . Msgf ( "%s environment variable missing" , constants . EnvOperatorPodName )
}
if len ( ip ) == 0 {
cliLog . Fatal ( ) . Msgf ( "%s environment variable missing" , constants . EnvOperatorPodIP )
}
// Get host name
id , err := os . Hostname ( )
if err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to get hostname" )
}
// Create kubernetes client
kubecli , err := k8sutil . NewKubeClient ( )
if err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to create Kubernetes client" )
}
secrets := kubecli . CoreV1 ( ) . Secrets ( namespace )
// Create operator
cfg , deps , err := newOperatorConfigAndDeps ( id + "-" + name , namespace , name )
if err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to create operator config & deps" )
}
o , err := operator . NewOperator ( cfg , deps )
if err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to create operator" )
}
listenAddr := net . JoinHostPort ( serverOptions . host , strconv . Itoa ( serverOptions . port ) )
if svr , err := server . NewServer ( kubecli . CoreV1 ( ) , server . Config {
Namespace : namespace ,
Address : listenAddr ,
TLSSecretName : serverOptions . tlsSecretName ,
TLSSecretNamespace : namespace ,
PodName : name ,
PodIP : ip ,
AdminSecretName : serverOptions . adminSecretName ,
AllowAnonymous : serverOptions . allowAnonymous ,
} , server . Dependencies {
Log : logService . MustGetLogger ( logging . LoggerNameServer ) ,
LivenessProbe : & livenessProbe ,
Deployment : server . OperatorDependency {
Enabled : cfg . EnableDeployment ,
Probe : & deploymentProbe ,
} ,
DeploymentReplication : server . OperatorDependency {
Enabled : cfg . EnableDeploymentReplication ,
Probe : & deploymentReplicationProbe ,
} ,
Storage : server . OperatorDependency {
Enabled : cfg . EnableStorage ,
Probe : & storageProbe ,
} ,
Backup : server . OperatorDependency {
Enabled : cfg . EnableBackup ,
Probe : & backupProbe ,
} ,
Apps : server . OperatorDependency {
Enabled : cfg . EnableApps ,
Probe : & appsProbe ,
} ,
2022-01-06 20:27:01 +00:00
ClusterSync : server . OperatorDependency {
Enabled : cfg . EnableK2KClusterSync ,
Probe : & k2KClusterSyncProbe ,
} ,
2022-01-04 18:46:53 +00:00
Operators : o ,
Secrets : secrets ,
} ) ; err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to create HTTP server" )
} else {
go utilsError . LogError ( cliLog , "error while starting service" , svr . Run )
}
// startChaos(context.Background(), cfg.KubeCli, cfg.Namespace, chaosLevel)
// Start operator
o . Run ( )
} else {
if err := startVersionProcess ( ) ; err != nil {
cliLog . Fatal ( ) . Err ( err ) . Msg ( "Failed to create HTTP server" )
}
}
}
func startVersionProcess ( ) error {
// Just expose version
listenAddr := net . JoinHostPort ( serverOptions . host , strconv . Itoa ( serverOptions . port ) )
cliLog . Info ( ) . Str ( "addr" , listenAddr ) . Msgf ( "Starting version endpoint" )
gin . SetMode ( gin . ReleaseMode )
r := gin . New ( )
r . Use ( gin . Recovery ( ) )
versionV1Responser , err := operatorHTTP . NewSimpleJSONResponse ( version . GetVersionV1 ( ) )
if err != nil {
return errors . WithStack ( err )
}
r . GET ( "/_api/version" , gin . WrapF ( versionV1Responser . ServeHTTP ) )
r . GET ( "/api/v1/version" , gin . WrapF ( versionV1Responser . ServeHTTP ) )
s := http . Server {
Addr : listenAddr ,
Handler : r ,
}
return s . ListenAndServe ( )
}
// newOperatorConfigAndDeps creates operator config & dependencies.
func newOperatorConfigAndDeps ( id , namespace , name string ) ( operator . Config , operator . Dependencies , error ) {
kubecli , err := k8sutil . NewKubeClient ( )
if err != nil {
return operator . Config { } , operator . Dependencies { } , maskAny ( err )
}
kubeMonCli , err := k8sutil . NewKubeMonitoringV1Client ( )
if err != nil {
return operator . Config { } , operator . Dependencies { } , maskAny ( err )
}
image , serviceAccount , err := getMyPodInfo ( kubecli , namespace , name )
if err != nil {
return operator . Config { } , operator . Dependencies { } , maskAny ( fmt . Errorf ( "Failed to get my pod's service account: %s" , err ) )
}
kubeExtCli , err := k8sutil . NewKubeExtClient ( )
if err != nil {
return operator . Config { } , operator . Dependencies { } , maskAny ( fmt . Errorf ( "Failed to create k8b api extensions client: %s" , err ) )
}
crCli , err := client . NewClient ( )
if err != nil {
return operator . Config { } , operator . Dependencies { } , maskAny ( fmt . Errorf ( "Failed to created versioned client: %s" , err ) )
}
eventRecorder := createRecorder ( cliLog , kubecli , name , namespace )
scope , ok := scope . AsScope ( operatorOptions . scope )
if ! ok {
return operator . Config { } , operator . Dependencies { } , maskAny ( fmt . Errorf ( "Scope %s is not known by Operator" , operatorOptions . scope ) )
}
cfg := operator . Config {
ID : id ,
Namespace : namespace ,
PodName : name ,
ServiceAccount : serviceAccount ,
OperatorImage : image ,
EnableDeployment : operatorOptions . enableDeployment ,
EnableDeploymentReplication : operatorOptions . enableDeploymentReplication ,
EnableStorage : operatorOptions . enableStorage ,
EnableBackup : operatorOptions . enableBackup ,
EnableApps : operatorOptions . enableApps ,
2022-01-06 20:27:01 +00:00
EnableK2KClusterSync : operatorOptions . enableK2KClusterSync ,
2022-01-04 18:46:53 +00:00
AllowChaos : chaosOptions . allowed ,
ScalingIntegrationEnabled : operatorOptions . scalingIntegrationEnabled ,
ArangoImage : operatorOptions . arangoImage ,
SingleMode : operatorOptions . singleMode ,
Scope : scope ,
}
deps := operator . Dependencies {
LogService : logService ,
KubeCli : kubecli ,
KubeExtCli : kubeExtCli ,
KubeMonitoringCli : kubeMonCli ,
CRCli : crCli ,
EventRecorder : eventRecorder ,
LivenessProbe : & livenessProbe ,
DeploymentProbe : & deploymentProbe ,
DeploymentReplicationProbe : & deploymentReplicationProbe ,
StorageProbe : & storageProbe ,
BackupProbe : & backupProbe ,
AppsProbe : & appsProbe ,
2022-01-06 20:27:01 +00:00
K2KClusterSyncProbe : & k2KClusterSyncProbe ,
2022-01-04 18:46:53 +00:00
}
return cfg , deps , nil
}
// getMyPodInfo looks up the image & service account of the pod with given name in given namespace
// Returns image, serviceAccount, error.
func getMyPodInfo ( kubecli kubernetes . Interface , namespace , name string ) ( string , string , error ) {
var image , sa string
op := func ( ) error {
pod , err := kubecli . CoreV1 ( ) . Pods ( namespace ) . Get ( context . Background ( ) , name , metav1 . GetOptions { } )
if err != nil {
cliLog . Error ( ) .
Err ( err ) .
Str ( "name" , name ) .
Msg ( "Failed to get operator pod" )
return maskAny ( err )
}
sa = pod . Spec . ServiceAccountName
2022-01-21 08:22:49 +00:00
if image , err = k8sutil . GetArangoDBImageIDFromPod ( pod ) ; err != nil {
return errors . Wrap ( err , "failed to get image ID from pod" )
}
2022-01-04 18:46:53 +00:00
if image == "" {
// Fallback in case we don't know the id.
image = pod . Spec . Containers [ 0 ] . Image
}
return nil
}
if err := retry . Retry ( op , time . Minute * 5 ) ; err != nil {
return "" , "" , maskAny ( err )
}
return image , sa , nil
}
func createRecorder ( log zerolog . Logger , kubecli kubernetes . Interface , name , namespace string ) record . EventRecorder {
eventBroadcaster := record . NewBroadcaster ( )
eventBroadcaster . StartLogging ( func ( format string , args ... interface { } ) {
log . Info ( ) . Msgf ( format , args ... )
} )
eventBroadcaster . StartRecordingToSink ( & v1core . EventSinkImpl { Interface : v1core . New ( kubecli . CoreV1 ( ) . RESTClient ( ) ) . Events ( namespace ) } )
combinedScheme := runtime . NewScheme ( )
scheme . AddToScheme ( combinedScheme )
v1 . AddToScheme ( combinedScheme )
appsv1 . AddToScheme ( combinedScheme )
return eventBroadcaster . NewRecorder ( combinedScheme , v1 . EventSource { Component : name } )
}