diff --git a/examples/simple-cluster.yaml b/examples/simple-cluster.yaml index 56ae336a6..52fb63621 100644 --- a/examples/simple-cluster.yaml +++ b/examples/simple-cluster.yaml @@ -4,7 +4,7 @@ metadata: name: "example-simple-cluster" spec: mode: cluster - image: arangodb/arangodb:3.2.9 + image: arangodb/arangodb:3.3.4 tls: altNames: ["kube-01", "kube-02", "kube-03"] coordinators: diff --git a/pkg/deployment/action_context.go b/pkg/deployment/action_context.go index 3b075d905..1ce049edd 100644 --- a/pkg/deployment/action_context.go +++ b/pkg/deployment/action_context.go @@ -27,11 +27,13 @@ import ( "fmt" driver "github.com/arangodb/go-driver" - api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" - "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" "github.com/rs/zerolog" "github.com/rs/zerolog/log" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" + "github.com/arangodb/kube-arangodb/pkg/util/k8sutil" ) // ActionContext provides methods to the Action implementations @@ -44,6 +46,8 @@ type ActionContext interface { GetDatabaseClient(ctx context.Context) (driver.Client, error) // GetServerClient returns a cached client for a specific server. GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error) + // GetAgencyClients returns a client connection for every agency member. + GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) // GetMemberStatusByID returns the current member status // for the member with given id. // Returns member status, true when found, or false @@ -101,6 +105,24 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr return c, nil } +// GetAgencyClients returns a client connection for every agency member. +func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) { + agencyMembers := ac.deployment.status.Members.Agents + result := make([]arangod.Agency, 0, len(agencyMembers)) + for _, m := range agencyMembers { + client, err := ac.GetServerClient(ctx, api.ServerGroupAgents, m.ID) + if err != nil { + return nil, maskAny(err) + } + aClient, err := arangod.NewAgencyClient(client) + if err != nil { + return nil, maskAny(err) + } + result = append(result, aClient) + } + return result, nil +} + // GetMemberStatusByID returns the current member status // for the member with given id. // Returns member status, true when found, or false diff --git a/pkg/deployment/action_wait_for_member_up.go b/pkg/deployment/action_wait_for_member_up.go index 1b2ed9a40..be90fedae 100644 --- a/pkg/deployment/action_wait_for_member_up.go +++ b/pkg/deployment/action_wait_for_member_up.go @@ -24,10 +24,14 @@ package deployment import ( "context" + "sync" + "time" driver "github.com/arangodb/go-driver" - api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" "github.com/rs/zerolog" + + api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha" + "github.com/arangodb/kube-arangodb/pkg/util/arangod" ) // NewWaitForMemberUpAction creates a new Action that implements the given @@ -40,6 +44,10 @@ func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx A } } +const ( + maxAgentResponseTime = time.Second * 10 +) + // actionWaitForMemberUp implements an WaitForMemberUp. type actionWaitForMemberUp struct { log zerolog.Logger @@ -91,19 +99,76 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool, return true, nil } +type agentStatus struct { + IsLeader bool + LeaderEndpoint string + IsResponding bool +} + // checkProgressAgent checks the progress of the action in the case // of an agent. func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) { log := a.log - c, err := a.actionCtx.GetDatabaseClient(ctx) + clients, err := a.actionCtx.GetAgencyClients(ctx) if err != nil { - log.Debug().Err(err).Msg("Failed to create database client") + log.Debug().Err(err).Msg("Failed to create agency clients") return false, maskAny(err) } - if _, err := c.Version(ctx); err != nil { - log.Debug().Err(err).Msg("Failed to get version") - return false, maskAny(err) + + wg := sync.WaitGroup{} + invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"} + statuses := make([]agentStatus, len(clients)) + for i, c := range clients { + wg.Add(1) + go func(i int, c arangod.Agency) { + defer wg.Done() + var trash interface{} + lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime) + defer cancel() + if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) { + // We got a valid read from the leader + statuses[i].IsLeader = true + statuses[i].LeaderEndpoint = c.Endpoint() + statuses[i].IsResponding = true + } else { + if location, ok := arangod.IsNotLeader(err); ok { + // Valid response from a follower + statuses[i].IsLeader = false + statuses[i].LeaderEndpoint = location + statuses[i].IsResponding = true + } else { + // Unexpected / invalid response + statuses[i].IsResponding = false + } + } + }(i, c) } + wg.Wait() + + // Check the results + noLeaders := 0 + for i, status := range statuses { + if !status.IsResponding { + log.Debug().Msg("Not all agents are responding") + return false, nil + } + if status.IsLeader { + noLeaders++ + } + if i > 0 { + // Compare leader endpoint with previous + prev := statuses[i-1].LeaderEndpoint + if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) { + log.Debug().Msg("Not all agents report the same leader endpoint") + return false, nil + } + } + } + if noLeaders != 1 { + log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders") + return false, nil + } + return true, nil } diff --git a/pkg/util/arangod/agency.go b/pkg/util/arangod/agency.go new file mode 100644 index 000000000..db956417f --- /dev/null +++ b/pkg/util/arangod/agency.go @@ -0,0 +1,146 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + driver "github.com/arangodb/go-driver" + "github.com/pkg/errors" +) + +// Agency provides API implemented by the ArangoDB agency. +type Agency interface { + // ReadKey reads the value of a given key in the agency. + ReadKey(ctx context.Context, key []string, value interface{}) error + // Endpoint returns the endpoint of this agent connection + Endpoint() string +} + +// NewAgencyClient creates a new Agency connection from the given client +// connection. +// The number of endpoints of the client must be exactly 1. +func NewAgencyClient(c driver.Client) (Agency, error) { + if len(c.Connection().Endpoints()) > 1 { + return nil, maskAny(fmt.Errorf("Got multiple endpoints")) + } + return &agency{ + conn: c.Connection(), + }, nil +} + +type agency struct { + conn driver.Connection +} + +// ReadKey reads the value of a given key in the agency. +func (a *agency) ReadKey(ctx context.Context, key []string, value interface{}) error { + conn := a.conn + req, err := conn.NewRequest("POST", "_api/agency/read") + if err != nil { + return maskAny(err) + } + fullKey := createFullKey(key) + input := [][]string{{fullKey}} + req, err = req.SetBody(input) + if err != nil { + return maskAny(err) + } + //var raw []byte + //ctx = driver.WithRawResponse(ctx, &raw) + resp, err := conn.Do(ctx, req) + if err != nil { + return maskAny(err) + } + if resp.StatusCode() == 307 { + // Not leader + location := resp.Header("Location") + return NotLeaderError{Leader: location} + } + if err := resp.CheckStatus(200, 201, 202); err != nil { + return maskAny(err) + } + //fmt.Printf("Agent response: %s\n", string(raw)) + elems, err := resp.ParseArrayBody() + if err != nil { + return maskAny(err) + } + if len(elems) != 1 { + return maskAny(fmt.Errorf("Expected 1 element, got %d", len(elems))) + } + // If empty key parse directly + if len(key) == 0 { + if err := elems[0].ParseBody("", &value); err != nil { + return maskAny(err) + } + } else { + // Now remove all wrapping objects for each key element + var rawObject map[string]interface{} + if err := elems[0].ParseBody("", &rawObject); err != nil { + return maskAny(err) + } + var rawMsg interface{} + for keyIndex := 0; keyIndex < len(key); keyIndex++ { + if keyIndex > 0 { + var ok bool + rawObject, ok = rawMsg.(map[string]interface{}) + if !ok { + return maskAny(fmt.Errorf("Data is not an object at key %s", key[:keyIndex+1])) + } + } + var found bool + rawMsg, found = rawObject[key[keyIndex]] + if !found { + return errors.Wrapf(KeyNotFoundError, "Missing data at key %s", key[:keyIndex+1]) + } + } + // Encode to json ... + encoded, err := json.Marshal(rawMsg) + if err != nil { + return maskAny(err) + } + // and decode back into result + if err := json.Unmarshal(encoded, &value); err != nil { + return maskAny(err) + } + } + + // fmt.Printf("result as JSON: %s\n", rawResult) + return nil +} + +// Endpoint returns the endpoint of this agent connection +func (a *agency) Endpoint() string { + ep := a.conn.Endpoints() + if len(ep) == 0 { + return "" + } + return ep[0] +} + +func createFullKey(key []string) string { + return "/" + strings.Join(key, "/") +} diff --git a/pkg/util/arangod/endpoint.go b/pkg/util/arangod/endpoint.go new file mode 100644 index 000000000..d5f2d0641 --- /dev/null +++ b/pkg/util/arangod/endpoint.go @@ -0,0 +1,42 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package arangod + +import "net/url" + +// IsSameEndpoint returns true when the 2 given endpoints +// refer to the same server. +func IsSameEndpoint(a, b string) bool { + if a == b { + return true + } + ua, err := url.Parse(a) + if err != nil { + return false + } + ub, err := url.Parse(b) + if err != nil { + return false + } + return ua.Hostname() == ub.Hostname() +} diff --git a/pkg/util/arangod/error.go b/pkg/util/arangod/error.go index 7ef7f5e00..b87e57ac4 100644 --- a/pkg/util/arangod/error.go +++ b/pkg/util/arangod/error.go @@ -25,5 +25,32 @@ package arangod import "github.com/pkg/errors" var ( + KeyNotFoundError = errors.New("Key not found") + maskAny = errors.WithStack ) + +// IsKeyNotFound returns true if the given error is (or is caused by) a KeyNotFoundError. +func IsKeyNotFound(err error) bool { + return errors.Cause(err) == KeyNotFoundError +} + +// NotLeaderError indicates the response of an agent when it is +// not the leader of the agency. +type NotLeaderError struct { + Leader string // Endpoint of the current leader +} + +// Error implements error. +func (e NotLeaderError) Error() string { + return "not the leader" +} + +// IsNotLeader returns true if the given error is (or is caused by) a NotLeaderError. +func IsNotLeader(err error) (string, bool) { + nlErr, ok := err.(NotLeaderError) + if ok { + return nlErr.Leader, true + } + return "", false +}