mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Merge pull request #218 from arangodb/feature/load-balancing-tests
Load balancing tests
This commit is contained in:
commit
8c9fee8c71
17 changed files with 393 additions and 49 deletions
|
@ -83,7 +83,8 @@ func (cc *clientCache) GetDatabase(ctx context.Context) (driver.Client, error) {
|
|||
}
|
||||
|
||||
// Not found, create a new client
|
||||
c, err := arangod.CreateArangodDatabaseClient(ctx, cc.kubecli.CoreV1(), cc.apiObject)
|
||||
shortTimeout := false
|
||||
c, err := arangod.CreateArangodDatabaseClient(ctx, cc.kubecli.CoreV1(), cc.apiObject, shortTimeout)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ var (
|
|||
Proxy: nhttp.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
KeepAlive: 90 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
|
@ -77,7 +77,7 @@ var (
|
|||
Proxy: nhttp.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
KeepAlive: 90 * time.Second,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
|
@ -86,13 +86,38 @@ var (
|
|||
ExpectContinueTimeout: 1 * time.Second,
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
sharedHTTPTransportShortTimeout = &nhttp.Transport{
|
||||
Proxy: nhttp.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 100 * time.Millisecond,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 100 * time.Millisecond,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
}
|
||||
sharedHTTPSTransportShortTimeout = &nhttp.Transport{
|
||||
Proxy: nhttp.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: 30 * time.Second,
|
||||
KeepAlive: 100 * time.Millisecond,
|
||||
DualStack: true,
|
||||
}).DialContext,
|
||||
MaxIdleConns: 100,
|
||||
IdleConnTimeout: 100 * time.Millisecond,
|
||||
TLSHandshakeTimeout: 10 * time.Second,
|
||||
ExpectContinueTimeout: 1 * time.Second,
|
||||
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
|
||||
}
|
||||
)
|
||||
|
||||
// CreateArangodClient creates a go-driver client for a specific member in the given group.
|
||||
func CreateArangodClient(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, group api.ServerGroup, id string) (driver.Client, error) {
|
||||
// Create connection
|
||||
dnsName := k8sutil.CreatePodDNSName(apiObject, group.AsRole(), id)
|
||||
c, err := createArangodClientForDNSName(ctx, cli, apiObject, dnsName)
|
||||
c, err := createArangodClientForDNSName(ctx, cli, apiObject, dnsName, false)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
@ -100,10 +125,10 @@ func CreateArangodClient(ctx context.Context, cli corev1.CoreV1Interface, apiObj
|
|||
}
|
||||
|
||||
// CreateArangodDatabaseClient creates a go-driver client for accessing the entire cluster (or single server).
|
||||
func CreateArangodDatabaseClient(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment) (driver.Client, error) {
|
||||
func CreateArangodDatabaseClient(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, shortTimeout bool) (driver.Client, error) {
|
||||
// Create connection
|
||||
dnsName := k8sutil.CreateDatabaseClientServiceDNSName(apiObject)
|
||||
c, err := createArangodClientForDNSName(ctx, cli, apiObject, dnsName)
|
||||
c, err := createArangodClientForDNSName(ctx, cli, apiObject, dnsName, shortTimeout)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
@ -117,7 +142,8 @@ func CreateArangodAgencyClient(ctx context.Context, cli corev1.CoreV1Interface,
|
|||
dnsName := k8sutil.CreatePodDNSName(apiObject, api.ServerGroupAgents.AsRole(), m.ID)
|
||||
dnsNames = append(dnsNames, dnsName)
|
||||
}
|
||||
connConfig, err := createArangodHTTPConfigForDNSNames(ctx, cli, apiObject, dnsNames)
|
||||
shortTimeout := false
|
||||
connConfig, err := createArangodHTTPConfigForDNSNames(ctx, cli, apiObject, dnsNames, shortTimeout)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
@ -147,7 +173,7 @@ func CreateArangodAgencyClient(ctx context.Context, cli corev1.CoreV1Interface,
|
|||
func CreateArangodImageIDClient(ctx context.Context, deployment k8sutil.APIObject, role, id string) (driver.Client, error) {
|
||||
// Create connection
|
||||
dnsName := k8sutil.CreatePodDNSName(deployment, role, id)
|
||||
c, err := createArangodClientForDNSName(ctx, nil, nil, dnsName)
|
||||
c, err := createArangodClientForDNSName(ctx, nil, nil, dnsName, false)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
@ -155,8 +181,8 @@ func CreateArangodImageIDClient(ctx context.Context, deployment k8sutil.APIObjec
|
|||
}
|
||||
|
||||
// CreateArangodClientForDNSName creates a go-driver client for a given DNS name.
|
||||
func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsName string) (driver.Client, error) {
|
||||
connConfig, err := createArangodHTTPConfigForDNSNames(ctx, cli, apiObject, []string{dnsName})
|
||||
func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsName string, shortTimeout bool) (driver.Client, error) {
|
||||
connConfig, err := createArangodHTTPConfigForDNSNames(ctx, cli, apiObject, []string{dnsName}, shortTimeout)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
@ -183,12 +209,18 @@ func createArangodClientForDNSName(ctx context.Context, cli corev1.CoreV1Interfa
|
|||
}
|
||||
|
||||
// createArangodHTTPConfigForDNSNames creates a go-driver HTTP connection config for a given DNS names.
|
||||
func createArangodHTTPConfigForDNSNames(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsNames []string) (http.ConnectionConfig, error) {
|
||||
func createArangodHTTPConfigForDNSNames(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsNames []string, shortTimeout bool) (http.ConnectionConfig, error) {
|
||||
scheme := "http"
|
||||
transport := sharedHTTPTransport
|
||||
if shortTimeout {
|
||||
transport = sharedHTTPTransportShortTimeout
|
||||
}
|
||||
if apiObject != nil && apiObject.Spec.IsSecure() {
|
||||
scheme = "https"
|
||||
transport = sharedHTTPSTransport
|
||||
if shortTimeout {
|
||||
transport = sharedHTTPSTransportShortTimeout
|
||||
}
|
||||
}
|
||||
connConfig := http.ConnectionConfig{
|
||||
Transport: transport,
|
||||
|
|
|
@ -69,7 +69,7 @@ func TestAuthenticationSingleDefaultSecret(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := arangod.WithRequireAuthentication(context.Background())
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -118,7 +118,7 @@ func TestAuthenticationSingleCustomSecret(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := arangod.WithRequireAuthentication(context.Background())
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -165,7 +165,7 @@ func TestAuthenticationNoneSingle(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := arangod.WithSkipAuthentication(context.Background())
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -208,7 +208,7 @@ func TestAuthenticationClusterDefaultSecret(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := arangod.WithRequireAuthentication(context.Background())
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -257,7 +257,7 @@ func TestAuthenticationClusterCustomSecret(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := arangod.WithRequireAuthentication(context.Background())
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -304,7 +304,7 @@ func TestAuthenticationNoneCluster(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := arangod.WithSkipAuthentication(context.Background())
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestChangeArgsAgents(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -145,7 +145,7 @@ func TestChangeArgsDBServer(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestCursorSingle(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -107,7 +107,7 @@ func TestCursorActiveFailover(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -151,7 +151,7 @@ func TestCursorCluster(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
|
@ -92,7 +92,7 @@ func deploymentSubTest(t *testing.T, mode api.DeploymentMode, engine api.Storage
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
DBClient := mustNewArangodDatabaseClient(ctx, kubecli, deployment, t)
|
||||
DBClient := mustNewArangodDatabaseClient(ctx, kubecli, deployment, t, nil)
|
||||
require.NoError(t, waitUntilArangoDeploymentHealthy(deployment, DBClient, kubecli, ""), fmt.Sprintf("Deployment not healthy in time: %v", err))
|
||||
|
||||
// Cleanup
|
||||
|
@ -142,9 +142,9 @@ func TestMultiDeployment(t *testing.T) {
|
|||
|
||||
// Create a database clients
|
||||
ctx := context.Background()
|
||||
DBClient1 := mustNewArangodDatabaseClient(ctx, kubecli, deployment1, t)
|
||||
DBClient1 := mustNewArangodDatabaseClient(ctx, kubecli, deployment1, t, nil)
|
||||
require.NoError(t, waitUntilArangoDeploymentHealthy(deployment1, DBClient1, kubecli, ""), fmt.Sprintf("Deployment not healthy in time: %v", err))
|
||||
DBClient2 := mustNewArangodDatabaseClient(ctx, kubecli, deployment2, t)
|
||||
DBClient2 := mustNewArangodDatabaseClient(ctx, kubecli, deployment2, t, nil)
|
||||
require.NoError(t, waitUntilArangoDeploymentHealthy(deployment1, DBClient1, kubecli, ""), fmt.Sprintf("Deployment not healthy in time: %v", err))
|
||||
|
||||
// Test if we are able to create a collections in both deployments.
|
||||
|
|
|
@ -63,7 +63,7 @@ func TestImmutableFields(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server to be completely ready
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
226
tests/load_balancer_test.go
Normal file
226
tests/load_balancer_test.go
Normal file
|
@ -0,0 +1,226 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package tests
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dchest/uniuri"
|
||||
|
||||
driver "github.com/arangodb/go-driver"
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/arangodb/kube-arangodb/pkg/client"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util"
|
||||
)
|
||||
|
||||
func TestLoadBalancingCursorVST(t *testing.T) {
|
||||
longOrSkip(t)
|
||||
// run with VST
|
||||
loadBalancingCursorSubtest(t, true)
|
||||
}
|
||||
|
||||
func TestLoadBalancingCursorHTTP(t *testing.T) {
|
||||
longOrSkip(t)
|
||||
// run with HTTP
|
||||
loadBalancingCursorSubtest(t, false)
|
||||
}
|
||||
|
||||
func wasForwarded(r driver.Response) bool {
|
||||
h := r.Header("x-arango-request-forwarded-to")
|
||||
return h != ""
|
||||
}
|
||||
|
||||
// tests cursor forwarding with load-balanced conn.
|
||||
func loadBalancingCursorSubtest(t *testing.T, useVst bool) {
|
||||
c := client.MustNewInCluster()
|
||||
kubecli := mustNewKubeClient(t)
|
||||
ns := getNamespace(t)
|
||||
|
||||
// Prepare deployment config
|
||||
namePrefix := "test-lb-"
|
||||
if useVst {
|
||||
namePrefix += "vst-"
|
||||
} else {
|
||||
namePrefix += "http-"
|
||||
}
|
||||
depl := newDeployment(namePrefix + uniuri.NewLen(4))
|
||||
depl.Spec.Mode = api.NewMode(api.DeploymentModeCluster)
|
||||
depl.Spec.Image = util.NewString("arangodb/arangodb:3.3.13") // Note: 3.3.13 is the first version supporting the cursor forwarding feature.
|
||||
|
||||
// Create deployment
|
||||
_, err := c.DatabaseV1alpha().ArangoDeployments(ns).Create(depl)
|
||||
if err != nil {
|
||||
t.Fatalf("Create deployment failed: %v", err)
|
||||
}
|
||||
// Prepare cleanup
|
||||
defer removeDeployment(c, depl.GetName(), ns)
|
||||
|
||||
// Wait for deployment to be ready
|
||||
apiObject, err := waitUntilDeployment(c, depl.GetName(), ns, deploymentIsReady())
|
||||
if err != nil {
|
||||
t.Fatalf("Deployment not running in time: %v", err)
|
||||
}
|
||||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
clOpts := &DatabaseClientOptions{
|
||||
UseVST: useVst,
|
||||
ShortTimeout: true,
|
||||
}
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, clOpts)
|
||||
|
||||
// Wait for cluster to be available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
t.Fatalf("Cluster not running returning version in time: %v", err)
|
||||
}
|
||||
|
||||
// Create data set
|
||||
collectionData := map[string][]interface{}{
|
||||
"books": []interface{}{
|
||||
Book{Title: "Book 01"},
|
||||
Book{Title: "Book 02"},
|
||||
Book{Title: "Book 03"},
|
||||
Book{Title: "Book 04"},
|
||||
Book{Title: "Book 05"},
|
||||
Book{Title: "Book 06"},
|
||||
Book{Title: "Book 07"},
|
||||
Book{Title: "Book 08"},
|
||||
Book{Title: "Book 09"},
|
||||
Book{Title: "Book 10"},
|
||||
Book{Title: "Book 11"},
|
||||
Book{Title: "Book 12"},
|
||||
Book{Title: "Book 13"},
|
||||
Book{Title: "Book 14"},
|
||||
Book{Title: "Book 15"},
|
||||
Book{Title: "Book 16"},
|
||||
Book{Title: "Book 17"},
|
||||
Book{Title: "Book 18"},
|
||||
Book{Title: "Book 19"},
|
||||
Book{Title: "Book 20"},
|
||||
},
|
||||
"users": []interface{}{
|
||||
UserDoc{Name: "John", Age: 13},
|
||||
UserDoc{Name: "Jake", Age: 25},
|
||||
UserDoc{Name: "Clair", Age: 12},
|
||||
UserDoc{Name: "Johnny", Age: 42},
|
||||
UserDoc{Name: "Blair", Age: 67},
|
||||
UserDoc{Name: "Zz", Age: 12},
|
||||
},
|
||||
}
|
||||
|
||||
db := ensureDatabase(ctx, client, "lb_cursor_test", nil, t)
|
||||
for colName, colDocs := range collectionData {
|
||||
col := ensureCollection(ctx, db, colName, nil, t)
|
||||
if _, _, err := col.CreateDocuments(ctx, colDocs); err != nil {
|
||||
t.Fatalf("Expected success, got %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Setup tests
|
||||
tests := []queryTest{
|
||||
queryTest{
|
||||
Query: "FOR d IN books SORT d.Title RETURN d",
|
||||
ExpectSuccess: true,
|
||||
ExpectedDocuments: collectionData["books"],
|
||||
DocumentType: reflect.TypeOf(Book{}),
|
||||
},
|
||||
}
|
||||
|
||||
var r driver.Response
|
||||
// Setup context
|
||||
ctx = driver.WithResponse(driver.WithQueryBatchSize(nil, 1), &r)
|
||||
|
||||
// keep track of whether at least one request was forwarded internally to the
|
||||
// correct coordinator behind the load balancer
|
||||
someRequestsForwarded := false
|
||||
someRequestsNotForwarded := false
|
||||
|
||||
// Run tests for every context alternative
|
||||
for i, test := range tests {
|
||||
cursor, err := db.Query(ctx, test.Query, test.BindVars)
|
||||
if err == nil {
|
||||
// Close upon exit of the function
|
||||
defer cursor.Close()
|
||||
}
|
||||
if test.ExpectSuccess {
|
||||
if err != nil {
|
||||
t.Errorf("Expected success in query %d (%s), got '%s'", i, test.Query, err)
|
||||
continue
|
||||
}
|
||||
if count := cursor.Count(); count != 0 {
|
||||
t.Errorf("Expected count of 0, got %d in query %d (%s)", count, i, test.Query)
|
||||
}
|
||||
var result []interface{}
|
||||
for {
|
||||
hasMore := cursor.HasMore()
|
||||
doc := reflect.New(test.DocumentType)
|
||||
if _, err := cursor.ReadDocument(ctx, doc.Interface()); driver.IsNoMoreDocuments(err) {
|
||||
if hasMore {
|
||||
t.Error("HasMore returned true, but ReadDocument returns a IsNoMoreDocuments error")
|
||||
}
|
||||
break
|
||||
} else if err != nil {
|
||||
t.Errorf("Failed to result document %d: %s", len(result), err)
|
||||
}
|
||||
if !hasMore {
|
||||
t.Error("HasMore returned false, but ReadDocument returns a document")
|
||||
}
|
||||
result = append(result, doc.Elem().Interface())
|
||||
if wasForwarded(r) {
|
||||
someRequestsForwarded = true
|
||||
} else {
|
||||
someRequestsNotForwarded = true
|
||||
}
|
||||
time.Sleep(200 * time.Millisecond)
|
||||
}
|
||||
if len(result) != len(test.ExpectedDocuments) {
|
||||
t.Errorf("Expected %d documents, got %d in query %d (%s)", len(test.ExpectedDocuments), len(result), i, test.Query)
|
||||
} else {
|
||||
for resultIdx, resultDoc := range result {
|
||||
if !reflect.DeepEqual(resultDoc, test.ExpectedDocuments[resultIdx]) {
|
||||
t.Errorf("Unexpected document in query %d (%s) at index %d: got %+v, expected %+v", i, test.Query, resultIdx, resultDoc, test.ExpectedDocuments[resultIdx])
|
||||
}
|
||||
}
|
||||
}
|
||||
// Close anyway (this tests calling Close more than once)
|
||||
if err := cursor.Close(); err != nil {
|
||||
t.Errorf("Expected success in Close of cursor from query %d (%s), got '%s'", i, test.Query, err)
|
||||
}
|
||||
} else {
|
||||
if err == nil {
|
||||
t.Errorf("Expected error in query %d (%s), got '%s'", i, test.Query, err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !someRequestsForwarded {
|
||||
t.Error("Did not detect any request being forwarded behind load balancer!")
|
||||
}
|
||||
if !someRequestsNotForwarded {
|
||||
t.Error("Did not detect any request NOT being forwarded behind load balancer!")
|
||||
}
|
||||
}
|
|
@ -65,7 +65,7 @@ func TestMemberResilienceAgents(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -166,7 +166,7 @@ func TestMemberResilienceCoordinators(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -263,7 +263,7 @@ func TestMemberResilienceDBServers(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
|
|
@ -68,7 +68,7 @@ func TestResiliencePod(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -149,7 +149,7 @@ func TestResiliencePVC(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -239,7 +239,7 @@ func TestResiliencePVDBServer(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -343,7 +343,7 @@ func TestResilienceService(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
|
|
@ -79,7 +79,7 @@ func TestRocksDBEncryptionSingle(t *testing.T) {
|
|||
|
||||
// Create database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
|
@ -62,7 +62,7 @@ func TestScaleClusterNonTLS(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -133,7 +133,7 @@ func TestScaleCluster(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
@ -209,7 +209,7 @@ func TestScaleClusterWithSync(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Create a syncmaster client
|
||||
syncClient := mustNewArangoSyncClient(ctx, kubecli, apiObject, t)
|
||||
|
|
|
@ -73,7 +73,7 @@ func TestServiceAccountSingle(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -122,7 +122,7 @@ func TestServiceAccountActiveFailover(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -173,7 +173,7 @@ func TestServiceAccountCluster(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -230,7 +230,7 @@ func TestServiceAccountClusterWithSync(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
|
@ -62,7 +62,7 @@ func TestSimpleSingle(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -100,7 +100,7 @@ func TestSimpleActiveFailover(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for single server available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -138,7 +138,7 @@ func TestSimpleCluster(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
@ -179,7 +179,7 @@ func TestSimpleClusterWithSync(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be available
|
||||
if err := waitUntilVersionUp(client, nil); err != nil {
|
||||
|
|
|
@ -65,7 +65,7 @@ func TestSyncToggleEnabled(t *testing.T) {
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t)
|
||||
client := mustNewArangodDatabaseClient(ctx, kubecli, apiObject, t, nil)
|
||||
|
||||
// Wait for cluster to be completely ready
|
||||
if err := waitUntilClusterHealth(client, func(h driver.ClusterHealth) error {
|
||||
|
|
|
@ -24,6 +24,7 @@ package tests
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
|
@ -39,9 +40,12 @@ import (
|
|||
"github.com/arangodb/arangosync/client"
|
||||
"github.com/arangodb/arangosync/tasks"
|
||||
driver "github.com/arangodb/go-driver"
|
||||
vst "github.com/arangodb/go-driver/vst"
|
||||
vstProtocol "github.com/arangodb/go-driver/vst/protocol"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/rs/zerolog"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/arangodb/kube-arangodb/pkg/generated/clientset/versioned"
|
||||
|
@ -60,6 +64,73 @@ var (
|
|||
showEnterpriseImageOnce sync.Once
|
||||
)
|
||||
|
||||
// CreateArangodClientForDNSName creates a go-driver client for a given DNS name.
|
||||
func createArangodVSTClientForDNSName(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsName string, shortTimeout bool) (driver.Client, error) {
|
||||
config := driver.ClientConfig{}
|
||||
connConfig, err := createArangodVSTConfigForDNSNames(ctx, cli, apiObject, []string{dnsName}, shortTimeout)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
// TODO deal with TLS with proper CA checking
|
||||
conn, err := vst.NewConnection(connConfig)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
|
||||
// Create client
|
||||
config = driver.ClientConfig{
|
||||
Connection: conn,
|
||||
}
|
||||
|
||||
auth := driver.BasicAuthentication("root", "")
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
config.Authentication = auth
|
||||
c, err := driver.NewClient(config)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// createArangodVSTConfigForDNSNames creates a go-driver VST connection config for a given DNS names.
|
||||
func createArangodVSTConfigForDNSNames(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, dnsNames []string, shortTimeout bool) (vst.ConnectionConfig, error) {
|
||||
scheme := "http"
|
||||
tlsConfig := &tls.Config{}
|
||||
timeout := 90 * time.Second
|
||||
if shortTimeout {
|
||||
timeout = 100 * time.Millisecond
|
||||
}
|
||||
if apiObject != nil && apiObject.Spec.IsSecure() {
|
||||
scheme = "https"
|
||||
tlsConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
}
|
||||
transport := vstProtocol.TransportConfig{
|
||||
IdleConnTimeout: timeout,
|
||||
Version: vstProtocol.Version1_1,
|
||||
}
|
||||
connConfig := vst.ConnectionConfig{
|
||||
TLSConfig: tlsConfig,
|
||||
Transport: transport,
|
||||
}
|
||||
for _, dnsName := range dnsNames {
|
||||
connConfig.Endpoints = append(connConfig.Endpoints, scheme+"://"+net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort)))
|
||||
}
|
||||
return connConfig, nil
|
||||
}
|
||||
|
||||
// CreateArangodDatabaseVSTClient creates a go-driver client for accessing the entire cluster (or single server) via VST
|
||||
func createArangodDatabaseVSTClient(ctx context.Context, cli corev1.CoreV1Interface, apiObject *api.ArangoDeployment, shortTimeout bool) (driver.Client, error) {
|
||||
// Create connection
|
||||
dnsName := k8sutil.CreateDatabaseClientServiceDNSName(apiObject)
|
||||
c, err := createArangodVSTClientForDNSName(ctx, cli, apiObject, dnsName, shortTimeout)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// longOrSkip checks the short test flag.
|
||||
// If short is set, the current test is skipped.
|
||||
// If not, this function returns as normal.
|
||||
|
@ -99,10 +170,24 @@ func mustNewKubeClient(t *testing.T) kubernetes.Interface {
|
|||
return c
|
||||
}
|
||||
|
||||
// DatabaseClientOptions contains options for creating an ArangoDB database client.
|
||||
type DatabaseClientOptions struct {
|
||||
ShortTimeout bool // If set, the connection timeout is set very short
|
||||
UseVST bool // If set, a VST connection is created instead of an HTTP connection
|
||||
}
|
||||
|
||||
// mustNewArangodDatabaseClient creates a new database client,
|
||||
// failing the test on errors.
|
||||
func mustNewArangodDatabaseClient(ctx context.Context, kubecli kubernetes.Interface, apiObject *api.ArangoDeployment, t *testing.T) driver.Client {
|
||||
c, err := arangod.CreateArangodDatabaseClient(ctx, kubecli.CoreV1(), apiObject)
|
||||
func mustNewArangodDatabaseClient(ctx context.Context, kubecli kubernetes.Interface, apiObject *api.ArangoDeployment, t *testing.T, options *DatabaseClientOptions) driver.Client {
|
||||
var c driver.Client
|
||||
var err error
|
||||
shortTimeout := options != nil && options.ShortTimeout
|
||||
useVST := options != nil && options.UseVST
|
||||
if useVST {
|
||||
c, err = createArangodDatabaseVSTClient(ctx, kubecli.CoreV1(), apiObject, shortTimeout)
|
||||
} else {
|
||||
c, err = arangod.CreateArangodDatabaseClient(ctx, kubecli.CoreV1(), apiObject, shortTimeout)
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create arango database client: %v", err)
|
||||
}
|
||||
|
|
|
@ -109,7 +109,7 @@ func upgradeSubTest(t *testing.T, mode api.DeploymentMode, engine api.StorageEng
|
|||
|
||||
// Create a database client
|
||||
ctx := context.Background()
|
||||
DBClient := mustNewArangodDatabaseClient(ctx, kubecli, deployment, t)
|
||||
DBClient := mustNewArangodDatabaseClient(ctx, kubecli, deployment, t, nil)
|
||||
|
||||
if err := waitUntilArangoDeploymentHealthy(deployment, DBClient, kubecli, ""); err != nil {
|
||||
t.Fatalf("Deployment not healthy in time: %v", err)
|
||||
|
|
Loading…
Reference in a new issue