1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Integration Service Authentication (#1705)

This commit is contained in:
Adam Janikowski 2024-08-26 21:13:14 +02:00 committed by GitHub
parent 25d636e2b5
commit f2daa8a975
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 953 additions and 47 deletions

View file

@ -14,6 +14,7 @@
- (Feature) Gateway Group for ArangoDeployment - (Feature) Gateway Group for ArangoDeployment
- (Feature) Gateway config loader - (Feature) Gateway config loader
- (Feature) ConfigV1 Integration Service - (Feature) ConfigV1 Integration Service
- (Feature) Integration Service Authentication
## [1.2.42](https://github.com/arangodb/kube-arangodb/tree/1.2.42) (2024-07-23) ## [1.2.42](https://github.com/arangodb/kube-arangodb/tree/1.2.42) (2024-07-23)
- (Maintenance) Go 1.22.4 & Kubernetes 1.29.6 libraries - (Maintenance) Go 1.22.4 & Kubernetes 1.29.6 libraries

View file

@ -18,7 +18,7 @@
// Copyright holder is ArangoDB GmbH, Cologne, Germany // Copyright holder is ArangoDB GmbH, Cologne, Germany
// //
package v1 package definition
const ( const (
Name = "shutdown.v1" Name = "shutdown.v1"

View file

@ -45,7 +45,7 @@ type impl struct {
} }
func (i *impl) Name() string { func (i *impl) Name() string {
return Name return pbShutdownV1.Name
} }
func (i *impl) Health() svc.HealthState { func (i *impl) Health() svc.HealthState {

70
pkg/integrations/auth.go Normal file
View file

@ -0,0 +1,70 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package integrations
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"github.com/arangodb/kube-arangodb/pkg/util"
)
func basicTokenAuthAuthorize(ctx context.Context, token string) error {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return status.Errorf(codes.Unauthenticated, "metadata is not provided")
}
values := md[util.AuthorizationGRPCHeader]
if len(values) == 0 {
return status.Errorf(codes.Unauthenticated, "authorization token is not provided")
}
if token != values[0] {
return status.Errorf(codes.Unauthenticated, "invalid token")
}
return nil
}
func basicTokenAuthUnaryInterceptor(token string) grpc.ServerOption {
return grpc.UnaryInterceptor(func(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp any, err error) {
if err := basicTokenAuthAuthorize(ctx, token); err != nil {
return nil, err
}
return handler(ctx, req)
})
}
func basicTokenAuthStreamInterceptor(token string) grpc.ServerOption {
return grpc.StreamInterceptor(func(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
if err := basicTokenAuthAuthorize(ss.Context(), token); err != nil {
return err
}
return handler(srv, ss)
})
}

View file

@ -0,0 +1,132 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package integrations
import (
"fmt"
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown"
"github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc"
)
func Test_AuthCases(t *testing.T) {
c, health, internal, external := startService(t,
"--health.auth.type=None",
"--services.external.auth.token=test1",
"--services.external.auth.type=Token",
"--services.auth.token=test2",
"--services.auth.type=Token",
)
defer c.Require(t)
t.Run("Without auth", func(t *testing.T) {
t.Run("health", func(t *testing.T) {
require.NoError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", health),
"--token=",
"client",
"health",
"v1"))
})
t.Run("internal", func(t *testing.T) {
tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", internal),
"--token=",
"client",
"health",
"v1")).
Code(t, codes.Unauthenticated).
Errorf(t, "authorization token is not provided")
})
t.Run("external", func(t *testing.T) {
tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", external),
"--token=",
"client",
"health",
"v1")).
Code(t, codes.Unauthenticated).
Errorf(t, "authorization token is not provided")
})
})
t.Run("With auth 1", func(t *testing.T) {
t.Run("health", func(t *testing.T) {
require.NoError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", health),
"--token=test1",
"client",
"health",
"v1"))
})
t.Run("internal", func(t *testing.T) {
tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", internal),
"--token=test1",
"client",
"health",
"v1")).
Code(t, codes.Unauthenticated).
Errorf(t, "invalid token")
})
t.Run("external", func(t *testing.T) {
require.NoError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", external),
"--token=test1",
"client",
"health",
"v1"))
})
})
t.Run("With auth 2", func(t *testing.T) {
t.Run("health", func(t *testing.T) {
require.NoError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", health),
"--token=test2",
"client",
"health",
"v1"))
})
t.Run("internal", func(t *testing.T) {
require.NoError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", internal),
"--token=test2",
"client",
"health",
"v1"))
})
t.Run("external", func(t *testing.T) {
tgrpc.AsGRPCError(t, executeSync(t, shutdown.Context(),
fmt.Sprintf("--address=127.0.0.1:%d", external),
"--token=test2",
"client",
"health",
"v1")).
Code(t, codes.Unauthenticated).
Errorf(t, "invalid token")
})
})
}

View file

@ -0,0 +1,83 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clients
import (
"context"
"io"
"github.com/spf13/cobra"
"google.golang.org/grpc"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown"
)
type commandRun[T any] interface {
Register(name, desc string, in func(ctx context.Context, client T) error) commandRun[T]
}
type commandRunImpl[T any] struct {
cmd *cobra.Command
cfg *Config
in func(cc grpc.ClientConnInterface) T
}
func (c commandRunImpl[T]) Register(name, desc string, in func(ctx context.Context, client T) error) commandRun[T] {
c.cmd.AddCommand(&cobra.Command{
Use: name,
Short: desc,
RunE: func(cmd *cobra.Command, args []string) error {
client, closer, err := client(shutdown.Context(), c.cfg, c.in)
if err != nil {
return err
}
defer closer.Close()
return in(shutdown.Context(), client)
},
})
return c
}
func withCommandRun[T any](cmd *cobra.Command, cfg *Config, in func(cc grpc.ClientConnInterface) T) commandRun[T] {
return &commandRunImpl[T]{
cmd: cmd,
cfg: cfg,
in: in,
}
}
func client[T any](ctx context.Context, cfg *Config, in func(cc grpc.ClientConnInterface) T) (T, io.Closer, error) {
var opts []grpc.DialOption
if token := cfg.Token; token != "" {
opts = append(opts, util.TokenAuthInterceptors(token)...)
}
client, closer, err := util.NewGRPCClient(ctx, in, cfg.Address, opts...)
if err != nil {
return util.Default[T](), nil, err
}
return client, closer, nil
}

View file

@ -0,0 +1,73 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clients
import (
"github.com/spf13/cobra"
pbHealth "google.golang.org/grpc/health/grpc_health_v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown"
)
func init() {
registerer.MustRegister("health/v1", func(cfg *Config) Client {
return &healthV1{
cfg: cfg,
}
})
}
type healthV1 struct {
cfg *Config
}
func (s *healthV1) Name() string {
return "health"
}
func (s *healthV1) Version() string {
return "v1"
}
func (s *healthV1) Register(cmd *cobra.Command) error {
cmd.RunE = func(cmd *cobra.Command, args []string) error {
client, c, err := client(shutdown.Context(), s.cfg, pbHealth.NewHealthClient)
if err != nil {
return err
}
defer c.Close()
res, err := client.Check(shutdown.Context(), &pbHealth.HealthCheckRequest{})
if err != nil {
return err
}
switch s := res.GetStatus(); s {
case pbHealth.HealthCheckResponse_SERVING:
println("OK")
return nil
default:
return errors.Errorf("Not healthy: %s", s.String())
}
}
return nil
}

View file

@ -0,0 +1,98 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clients
import (
"github.com/spf13/cobra"
"github.com/arangodb/kube-arangodb/pkg/util"
)
var registerer = util.NewRegisterer[string, Factory]()
type Factory func(c *Config) Client
type Config struct {
Address string
Token string
}
func (c *Config) Register(cmd *cobra.Command) error {
f := cmd.PersistentFlags()
f.StringVar(&c.Address, "address", "127.0.0.1:8080", "GRPC Service Address")
f.StringVar(&c.Token, "token", "", "GRPC Token")
return nil
}
type Client interface {
Name() string
Version() string
Register(cmd *cobra.Command) error
}
func Register(cmd *cobra.Command) error {
client := &cobra.Command{Use: "client"}
cmd.AddCommand(client)
var cfg config
return cfg.Register(client)
}
type config struct {
cfg Config
}
func (c *config) Register(cmd *cobra.Command) error {
if err := c.cfg.Register(cmd); err != nil {
return err
}
cmds := map[string]*cobra.Command{}
for _, command := range registerer.Items() {
r := command.V(&c.cfg)
v, ok := cmds[r.Name()]
if !ok {
v = &cobra.Command{
Use: r.Name(),
}
cmd.AddCommand(v)
cmds[r.Name()] = v
}
p := &cobra.Command{
Use: r.Version(),
}
if err := r.Register(p); err != nil {
return err
}
v.AddCommand(p)
}
return nil
}

View file

@ -0,0 +1,60 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package clients
import (
"context"
"github.com/spf13/cobra"
pbSharedV1 "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition"
pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition"
)
func init() {
registerer.MustRegister("shutdown/v1", func(cfg *Config) Client {
return &shutdownV1{
cfg: cfg,
}
})
}
type shutdownV1 struct {
cfg *Config
}
func (s *shutdownV1) Name() string {
return "shutdown"
}
func (s *shutdownV1) Version() string {
return "v1"
}
func (s *shutdownV1) Register(cmd *cobra.Command) error {
withCommandRun(cmd, s.cfg, pbShutdownV1.NewShutdownV1Client).
Register("shutdown", "Runs the Shutdown GRPC Call", func(ctx context.Context, client pbShutdownV1.ShutdownV1Client) error {
_, err := client.Shutdown(ctx, &pbSharedV1.Empty{})
return err
})
return nil
}

View file

@ -40,3 +40,17 @@ type Integration interface {
Handler(ctx context.Context) (svc.Handler, error) Handler(ctx context.Context) (svc.Handler, error)
} }
type IntegrationEnablement interface {
Integration
EnabledTypes() (internal, external bool)
}
func GetIntegrationEnablement(in Integration) (internal, external bool) {
if v, ok := in.(IntegrationEnablement); ok {
return v.EnabledTypes()
}
return true, false
}

View file

@ -21,11 +21,17 @@
package integrations package integrations
import ( import (
"context"
"fmt" "fmt"
"sort" "sort"
"strings"
"sync"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"google.golang.org/grpc"
pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1"
"github.com/arangodb/kube-arangodb/pkg/integrations/clients"
"github.com/arangodb/kube-arangodb/pkg/util" "github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors" "github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown" "github.com/arangodb/kube-arangodb/pkg/util/shutdown"
@ -40,20 +46,62 @@ func Register(cmd *cobra.Command) error {
return c.Register(cmd) return c.Register(cmd)
} }
type configurationTest struct {
ctx context.Context
cancel context.CancelFunc
}
type configuration struct { type configuration struct {
// Only for testing
test *configurationTest
registered []Integration registered []Integration
health struct { health struct {
serviceConfiguration
shutdownEnabled bool shutdownEnabled bool
config svc.Configuration
} }
services struct { services struct {
config svc.Configuration internal, external serviceConfiguration
} }
} }
type serviceConfiguration struct {
enabled bool
address string
auth struct {
t string
token string
}
}
func (s *serviceConfiguration) Config() (svc.Configuration, error) {
var opts []grpc.ServerOption
switch strings.ToLower(s.auth.t) {
case "none":
break
case "token":
if s.auth.token == "" {
return util.Default[svc.Configuration](), errors.Errorf("Token is empty")
}
opts = append(opts,
basicTokenAuthUnaryInterceptor(s.auth.token),
basicTokenAuthStreamInterceptor(s.auth.token),
)
}
return svc.Configuration{
Options: opts,
Address: s.address,
}, nil
}
func (c *configuration) Register(cmd *cobra.Command) error { func (c *configuration) Register(cmd *cobra.Command) error {
c.registered = util.FormatList(registerer.Items(), func(a util.KV[string, Factory]) Integration { c.registered = util.FormatList(registerer.Items(), func(a util.KV[string, Factory]) Integration {
return a.V() return a.V()
@ -67,14 +115,28 @@ func (c *configuration) Register(cmd *cobra.Command) error {
f := cmd.Flags() f := cmd.Flags()
f.StringVar(&c.health.config.Address, "health.address", "0.0.0.0:9091", "Address to expose health service") f.StringVar(&c.health.address, "health.address", "0.0.0.0:9091", "Address to expose health service")
f.BoolVar(&c.health.shutdownEnabled, "health.shutdown.enabled", true, "Determines if shutdown service should be enabled and exposed") f.BoolVar(&c.health.shutdownEnabled, "health.shutdown.enabled", true, "Determines if shutdown service should be enabled and exposed")
f.StringVar(&c.services.config.Address, "services.address", "127.0.0.1:9092", "Address to expose services") f.StringVar(&c.health.auth.t, "health.auth.type", "None", "Auth type for health service")
f.StringVar(&c.health.auth.token, "health.auth.token", "", "Token for health service (when auth service is token)")
f.BoolVar(&c.services.internal.enabled, "services.enabled", true, "Defines if internal access is enabled")
f.StringVar(&c.services.internal.address, "services.address", "127.0.0.1:9092", "Address to expose internal services")
f.StringVar(&c.services.internal.auth.t, "services.auth.type", "None", "Auth type for internal service")
f.StringVar(&c.services.internal.auth.token, "services.auth.token", "", "Token for internal service (when auth service is token)")
f.BoolVar(&c.services.external.enabled, "services.external.enabled", false, "Defines if external access is enabled")
f.StringVar(&c.services.external.address, "services.external.address", "0.0.0.0:9093", "Address to expose external services")
f.StringVar(&c.services.external.auth.t, "services.external.auth.type", "None", "Auth type for external service")
f.StringVar(&c.services.external.auth.token, "services.external.auth.token", "", "Token for external service (when auth service is token)")
for _, service := range c.registered { for _, service := range c.registered {
prefix := fmt.Sprintf("integration.%s", service.Name()) prefix := fmt.Sprintf("integration.%s", service.Name())
f.Bool(prefix, false, service.Description()) f.Bool(prefix, false, service.Description())
internal, external := GetIntegrationEnablement(service)
f.Bool(fmt.Sprintf("%s.internal", prefix), internal, fmt.Sprintf("Defones if Internal access to service %s is enabled", service.Name()))
f.Bool(fmt.Sprintf("%s.external", prefix), external, fmt.Sprintf("Defones if External access to service %s is enabled", service.Name()))
if err := service.Register(cmd, func(name string) string { if err := service.Register(cmd, func(name string) string {
return fmt.Sprintf("%s.%s", prefix, name) return fmt.Sprintf("%s.%s", prefix, name)
@ -83,22 +145,65 @@ func (c *configuration) Register(cmd *cobra.Command) error {
} }
} }
return nil return clients.Register(cmd)
} }
func (c *configuration) run(cmd *cobra.Command, args []string) error { func (c *configuration) run(cmd *cobra.Command, args []string) error {
handlers := make([]svc.Handler, 0, len(c.registered)) if t := c.test; t == nil {
return c.runWithContext(shutdown.Context(), shutdown.Stop, cmd)
} else {
return c.runWithContext(t.ctx, t.cancel, cmd)
}
}
func (c *configuration) runWithContext(ctx context.Context, cancel context.CancelFunc, cmd *cobra.Command) error {
healthConfig, err := c.health.Config()
if err != nil {
return errors.Wrapf(err, "Unable to parse health config")
}
internalConfig, err := c.services.internal.Config()
if err != nil {
return errors.Wrapf(err, "Unable to parse internal config")
}
externalConfig, err := c.services.external.Config()
if err != nil {
return errors.Wrapf(err, "Unable to parse external config")
}
var internalHandlers, externalHandlers []svc.Handler
for _, handler := range c.registered { for _, handler := range c.registered {
if ok, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s", handler.Name())); err != nil { if ok, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s", handler.Name())); err != nil {
return err return err
} else { } else {
logger.Str("service", handler.Name()).Bool("enabled", ok).Info("Service discovered") internalEnabled, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s.internal", handler.Name()))
if ok { if err != nil {
if svc, err := handler.Handler(shutdown.Context()); err != nil { return err
}
externalEnabled, err := cmd.Flags().GetBool(fmt.Sprintf("integration.%s.external", handler.Name()))
if err != nil {
return err
}
logger.
Str("service", handler.Name()).
Bool("enabled", ok).
Bool("internal", internalEnabled).
Bool("external", externalEnabled).
Info("Service discovered")
if ok && (internalEnabled || externalEnabled) {
if svc, err := handler.Handler(ctx); err != nil {
return err return err
} else { } else {
handlers = append(handlers, svc) if internalEnabled {
internalHandlers = append(internalHandlers, svc)
}
if externalEnabled {
externalHandlers = append(externalHandlers, svc)
}
} }
} }
} }
@ -107,18 +212,57 @@ func (c *configuration) run(cmd *cobra.Command, args []string) error {
var healthServices []svc.Handler var healthServices []svc.Handler
if c.health.shutdownEnabled { if c.health.shutdownEnabled {
healthServices = append(healthServices, shutdown.NewGlobalShutdownServer()) healthServices = append(healthServices, pbImplShutdownV1.New(cancel))
} }
health := svc.NewHealthService(c.health.config, svc.Readiness, healthServices...) health := svc.NewHealthService(healthConfig, svc.Readiness, healthServices...)
healthHandler := health.Start(shutdown.Context()) internalHandlers = append(internalHandlers, health)
externalHandlers = append(externalHandlers, health)
healthHandler := health.Start(ctx)
logger.Str("address", healthHandler.Address()).Info("Health handler started") logger.Str("address", healthHandler.Address()).Info("Health handler started")
s := svc.NewService(c.services.config, handlers...).StartWithHealth(shutdown.Context(), health) var wg sync.WaitGroup
logger.Str("address", s.Address()).Info("Service handler started") var internal, external error
return s.Wait() if c.services.internal.enabled {
wg.Add(1)
go func() {
defer wg.Done()
s := svc.NewService(internalConfig, internalHandlers...).StartWithHealth(ctx, health)
logger.Str("address", s.Address()).Str("type", "internal").Info("Service handler started")
internal = s.Wait()
if internal != nil {
logger.Err(internal).Str("address", s.Address()).Str("type", "internal").Error("Service handler failed")
}
}()
}
if c.services.external.enabled {
wg.Add(1)
go func() {
defer wg.Done()
s := svc.NewService(externalConfig, externalHandlers...).StartWithHealth(ctx, health)
logger.Str("address", s.Address()).Str("type", "external").Info("Service handler started")
external = s.Wait()
if external != nil {
logger.Err(external).Str("address", s.Address()).Str("type", "external").Error("Service handler failed")
}
}()
}
wg.Wait()
return errors.Errors(internal, external)
} }

View file

@ -26,12 +26,13 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1" pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1"
pbShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1/definition"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown" "github.com/arangodb/kube-arangodb/pkg/util/shutdown"
"github.com/arangodb/kube-arangodb/pkg/util/svc" "github.com/arangodb/kube-arangodb/pkg/util/svc"
) )
func init() { func init() {
registerer.Register(pbImplShutdownV1.Name, func() Integration { registerer.Register(pbShutdownV1.Name, func() Integration {
return &shutdownV1{} return &shutdownV1{}
}) })
} }
@ -40,11 +41,11 @@ type shutdownV1 struct {
} }
func (s *shutdownV1) Handler(ctx context.Context) (svc.Handler, error) { func (s *shutdownV1) Handler(ctx context.Context) (svc.Handler, error) {
return shutdown.NewGlobalShutdownServer(), nil return pbImplShutdownV1.New(shutdown.Stop), nil
} }
func (s *shutdownV1) Name() string { func (s *shutdownV1) Name() string {
return pbImplShutdownV1.Name return pbShutdownV1.Name
} }
func (s *shutdownV1) Description() string { func (s *shutdownV1) Description() string {

View file

@ -0,0 +1,111 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package integrations
import (
"context"
"fmt"
"os"
"testing"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"github.com/arangodb/kube-arangodb/pkg/util/shutdown"
"github.com/arangodb/kube-arangodb/pkg/util/tests"
)
type waitFunc func() error
func (w waitFunc) Require(t *testing.T) {
require.NoError(t, w())
}
func executeSync(t *testing.T, ctx context.Context, args ...string) error {
var c configuration
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
defer cancel()
<-shutdown.Channel()
}()
c.test = &configurationTest{
ctx: ctx,
cancel: cancel,
}
cmd := &cobra.Command{}
tCmd := &cobra.Command{
Use: "test",
}
require.NoError(t, c.Register(tCmd))
cmd.AddCommand(tCmd)
cmd.SetOut(os.Stdout)
cmd.SetArgs(append([]string{"test"}, args...))
return cmd.Execute()
}
func executeAsync(t *testing.T, ctx context.Context, args ...string) waitFunc {
ctx, cancel := context.WithCancel(ctx)
var err error
done := make(chan struct{})
go func() {
defer close(done)
err = executeSync(t, ctx, args...)
}()
return func() error {
cancel()
<-done
return err
}
}
func startService(t *testing.T, args ...string) (waitFunc, int, int, int) {
_, health := tests.ResolveAddress(t, "127.0.0.1:0")
_, internal := tests.ResolveAddress(t, "127.0.0.1:0")
_, external := tests.ResolveAddress(t, "127.0.0.1:0")
cancel := executeAsync(t, shutdown.Context(), append([]string{
fmt.Sprintf("--health.address=127.0.0.1:%d", health),
fmt.Sprintf("--services.address=127.0.0.1:%d", internal),
fmt.Sprintf("--services.external.address=127.0.0.1:%d", external),
"--services.external.enabled",
}, args...)...)
tests.WaitForAddress(t, "127.0.0.1", health)
tests.WaitForAddress(t, "127.0.0.1", internal)
tests.WaitForAddress(t, "127.0.0.1", external)
return cancel, health, internal, external
}

71
pkg/util/grpc.go Normal file
View file

@ -0,0 +1,71 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package util
import (
"context"
"io"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
)
const AuthorizationGRPCHeader = "adb-authorization"
func NewGRPCClient[T any](ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) (T, io.Closer, error) {
con, err := NewGRPCConn(ctx, addr, opts...)
if err != nil {
return Default[T](), nil, err
}
return in(con), con, nil
}
func NewGRPCConn(ctx context.Context, addr string, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
var z []grpc.DialOption
z = append(z, grpc.WithTransportCredentials(insecure.NewCredentials()))
z = append(z, opts...)
conn, err := grpc.DialContext(ctx, addr, z...)
if err != nil {
return nil, err
}
return conn, nil
}
func TokenAuthInterceptors(token string) []grpc.DialOption {
return []grpc.DialOption{
grpc.WithUnaryInterceptor(func(ctx context.Context, method string, req, reply any, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
return invoker(attachTokenAuthToInterceptors(ctx, token), method, req, reply, cc, opts...)
}),
grpc.WithStreamInterceptor(func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
return streamer(attachTokenAuthToInterceptors(ctx, token), desc, cc, method, opts...)
}),
}
}
func attachTokenAuthToInterceptors(ctx context.Context, token string) context.Context {
return metadata.AppendToOutgoingContext(ctx, AuthorizationGRPCHeader, token)
}

View file

@ -55,9 +55,6 @@ func (r *registerer[K, V]) Register(key K, value V) bool {
} }
func (r *registerer[K, V]) MustRegister(key K, value V) { func (r *registerer[K, V]) MustRegister(key K, value V) {
r.lock.Lock()
defer r.lock.Unlock()
if !r.Register(key, value) { if !r.Register(key, value) {
panic("Unable to register item") panic("Unable to register item")
} }

View file

@ -25,9 +25,6 @@ import (
"os" "os"
"os/signal" "os/signal"
"syscall" "syscall"
pbImplShutdownV1 "github.com/arangodb/kube-arangodb/integrations/shutdown/v1"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
) )
func init() { func init() {
@ -43,10 +40,6 @@ func init() {
}() }()
} }
func NewGlobalShutdownServer() svc.Handler {
return pbImplShutdownV1.New(stop)
}
var ( var (
ctx context.Context ctx context.Context
stop context.CancelFunc stop context.CancelFunc

View file

@ -47,6 +47,8 @@ type Health interface {
} }
type HealthService interface { type HealthService interface {
Handler
Service Service
Health Health

67
pkg/util/tests/network.go Normal file
View file

@ -0,0 +1,67 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package tests
import (
"fmt"
"net"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func ResolveAddress(t *testing.T, addr string) (string, int) {
ln, err := net.Listen("tcp", addr)
require.NoError(t, err)
pr, ok := ln.Addr().(*net.TCPAddr)
require.True(t, ok)
addr = pr.IP.String()
port := pr.Port
require.NoError(t, ln.Close())
return addr, port
}
func WaitForAddress(t *testing.T, addr string, port int) {
tickerT := time.NewTicker(125 * time.Millisecond)
defer tickerT.Stop()
timerT := time.NewTimer(1 * time.Second)
defer timerT.Stop()
for {
select {
case <-timerT.C:
require.Fail(t, "Timeouted")
case <-tickerT.C:
conn, err := net.DialTimeout("tcp", fmt.Sprintf("%s:%d", addr, port), 125*time.Millisecond)
if err != nil {
continue
}
require.NoError(t, conn.Close())
return
}
}
}

View file

@ -28,33 +28,22 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/svc" "github.com/arangodb/kube-arangodb/pkg/util/svc"
) )
func NewGRPCClient[T any](t *testing.T, ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) T { func NewGRPCClient[T any](t *testing.T, ctx context.Context, in func(cc grpc.ClientConnInterface) T, addr string, opts ...grpc.DialOption) T {
return in(NewGRPCConn(t, ctx, addr, opts...)) client, closer, err := util.NewGRPCClient(ctx, in, addr, opts...)
}
func NewGRPCConn(t *testing.T, ctx context.Context, addr string, opts ...grpc.DialOption) *grpc.ClientConn {
var z []grpc.DialOption
z = append(z, grpc.WithTransportCredentials(insecure.NewCredentials()))
z = append(z, opts...)
conn, err := grpc.DialContext(ctx, addr, z...)
require.NoError(t, err) require.NoError(t, err)
go func() { go func() {
<-ctx.Done() <-ctx.Done()
require.NoError(t, conn.Close()) require.NoError(t, closer.Close())
}() }()
return conn return client
} }
type ErrorStatusValidator interface { type ErrorStatusValidator interface {