1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Working on agency is happy check

This commit is contained in:
Ewout Prangsma 2018-03-20 17:14:28 +01:00
parent affa7235e5
commit 0635fd41b1
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
6 changed files with 311 additions and 9 deletions

View file

@ -4,7 +4,7 @@ metadata:
name: "example-simple-cluster"
spec:
mode: cluster
image: arangodb/arangodb:3.2.9
image: arangodb/arangodb:3.3.4
tls:
altNames: ["kube-01", "kube-02", "kube-03"]
coordinators:

View file

@ -27,11 +27,13 @@ import (
"fmt"
driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
// ActionContext provides methods to the Action implementations
@ -44,6 +46,8 @@ type ActionContext interface {
GetDatabaseClient(ctx context.Context) (driver.Client, error)
// GetServerClient returns a cached client for a specific server.
GetServerClient(ctx context.Context, group api.ServerGroup, id string) (driver.Client, error)
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]arangod.Agency, error)
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false
@ -101,6 +105,24 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr
return c, nil
}
// GetAgencyClients returns a client connection for every agency member.
func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]arangod.Agency, error) {
agencyMembers := ac.deployment.status.Members.Agents
result := make([]arangod.Agency, 0, len(agencyMembers))
for _, m := range agencyMembers {
client, err := ac.GetServerClient(ctx, api.ServerGroupAgents, m.ID)
if err != nil {
return nil, maskAny(err)
}
aClient, err := arangod.NewAgencyClient(client)
if err != nil {
return nil, maskAny(err)
}
result = append(result, aClient)
}
return result, nil
}
// GetMemberStatusByID returns the current member status
// for the member with given id.
// Returns member status, true when found, or false

View file

@ -24,10 +24,14 @@ package deployment
import (
"context"
"sync"
"time"
driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/rs/zerolog"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)
// NewWaitForMemberUpAction creates a new Action that implements the given
@ -40,6 +44,10 @@ func NewWaitForMemberUpAction(log zerolog.Logger, action api.Action, actionCtx A
}
}
const (
maxAgentResponseTime = time.Second * 10
)
// actionWaitForMemberUp implements an WaitForMemberUp.
type actionWaitForMemberUp struct {
log zerolog.Logger
@ -91,19 +99,76 @@ func (a *actionWaitForMemberUp) checkProgressSingle(ctx context.Context) (bool,
return true, nil
}
type agentStatus struct {
IsLeader bool
LeaderEndpoint string
IsResponding bool
}
// checkProgressAgent checks the progress of the action in the case
// of an agent.
func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, error) {
log := a.log
c, err := a.actionCtx.GetDatabaseClient(ctx)
clients, err := a.actionCtx.GetAgencyClients(ctx)
if err != nil {
log.Debug().Err(err).Msg("Failed to create database client")
log.Debug().Err(err).Msg("Failed to create agency clients")
return false, maskAny(err)
}
if _, err := c.Version(ctx); err != nil {
log.Debug().Err(err).Msg("Failed to get version")
return false, maskAny(err)
wg := sync.WaitGroup{}
invalidKey := []string{"does-not-exists-149e97e8-4b81-5664-a8a8-9ba93881d64c"}
statuses := make([]agentStatus, len(clients))
for i, c := range clients {
wg.Add(1)
go func(i int, c arangod.Agency) {
defer wg.Done()
var trash interface{}
lctx, cancel := context.WithTimeout(ctx, maxAgentResponseTime)
defer cancel()
if err := c.ReadKey(lctx, invalidKey, &trash); err == nil || arangod.IsKeyNotFound(err) {
// We got a valid read from the leader
statuses[i].IsLeader = true
statuses[i].LeaderEndpoint = c.Endpoint()
statuses[i].IsResponding = true
} else {
if location, ok := arangod.IsNotLeader(err); ok {
// Valid response from a follower
statuses[i].IsLeader = false
statuses[i].LeaderEndpoint = location
statuses[i].IsResponding = true
} else {
// Unexpected / invalid response
statuses[i].IsResponding = false
}
}
}(i, c)
}
wg.Wait()
// Check the results
noLeaders := 0
for i, status := range statuses {
if !status.IsResponding {
log.Debug().Msg("Not all agents are responding")
return false, nil
}
if status.IsLeader {
noLeaders++
}
if i > 0 {
// Compare leader endpoint with previous
prev := statuses[i-1].LeaderEndpoint
if !arangod.IsSameEndpoint(prev, status.LeaderEndpoint) {
log.Debug().Msg("Not all agents report the same leader endpoint")
return false, nil
}
}
}
if noLeaders != 1 {
log.Debug().Int("leaders", noLeaders).Msg("Unexpected number of agency leaders")
return false, nil
}
return true, nil
}

146
pkg/util/arangod/agency.go Normal file
View file

@ -0,0 +1,146 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package arangod
import (
"context"
"encoding/json"
"fmt"
"strings"
driver "github.com/arangodb/go-driver"
"github.com/pkg/errors"
)
// Agency provides API implemented by the ArangoDB agency.
type Agency interface {
// ReadKey reads the value of a given key in the agency.
ReadKey(ctx context.Context, key []string, value interface{}) error
// Endpoint returns the endpoint of this agent connection
Endpoint() string
}
// NewAgencyClient creates a new Agency connection from the given client
// connection.
// The number of endpoints of the client must be exactly 1.
func NewAgencyClient(c driver.Client) (Agency, error) {
if len(c.Connection().Endpoints()) > 1 {
return nil, maskAny(fmt.Errorf("Got multiple endpoints"))
}
return &agency{
conn: c.Connection(),
}, nil
}
type agency struct {
conn driver.Connection
}
// ReadKey reads the value of a given key in the agency.
func (a *agency) ReadKey(ctx context.Context, key []string, value interface{}) error {
conn := a.conn
req, err := conn.NewRequest("POST", "_api/agency/read")
if err != nil {
return maskAny(err)
}
fullKey := createFullKey(key)
input := [][]string{{fullKey}}
req, err = req.SetBody(input)
if err != nil {
return maskAny(err)
}
//var raw []byte
//ctx = driver.WithRawResponse(ctx, &raw)
resp, err := conn.Do(ctx, req)
if err != nil {
return maskAny(err)
}
if resp.StatusCode() == 307 {
// Not leader
location := resp.Header("Location")
return NotLeaderError{Leader: location}
}
if err := resp.CheckStatus(200, 201, 202); err != nil {
return maskAny(err)
}
//fmt.Printf("Agent response: %s\n", string(raw))
elems, err := resp.ParseArrayBody()
if err != nil {
return maskAny(err)
}
if len(elems) != 1 {
return maskAny(fmt.Errorf("Expected 1 element, got %d", len(elems)))
}
// If empty key parse directly
if len(key) == 0 {
if err := elems[0].ParseBody("", &value); err != nil {
return maskAny(err)
}
} else {
// Now remove all wrapping objects for each key element
var rawObject map[string]interface{}
if err := elems[0].ParseBody("", &rawObject); err != nil {
return maskAny(err)
}
var rawMsg interface{}
for keyIndex := 0; keyIndex < len(key); keyIndex++ {
if keyIndex > 0 {
var ok bool
rawObject, ok = rawMsg.(map[string]interface{})
if !ok {
return maskAny(fmt.Errorf("Data is not an object at key %s", key[:keyIndex+1]))
}
}
var found bool
rawMsg, found = rawObject[key[keyIndex]]
if !found {
return errors.Wrapf(KeyNotFoundError, "Missing data at key %s", key[:keyIndex+1])
}
}
// Encode to json ...
encoded, err := json.Marshal(rawMsg)
if err != nil {
return maskAny(err)
}
// and decode back into result
if err := json.Unmarshal(encoded, &value); err != nil {
return maskAny(err)
}
}
// fmt.Printf("result as JSON: %s\n", rawResult)
return nil
}
// Endpoint returns the endpoint of this agent connection
func (a *agency) Endpoint() string {
ep := a.conn.Endpoints()
if len(ep) == 0 {
return ""
}
return ep[0]
}
func createFullKey(key []string) string {
return "/" + strings.Join(key, "/")
}

View file

@ -0,0 +1,42 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package arangod
import "net/url"
// IsSameEndpoint returns true when the 2 given endpoints
// refer to the same server.
func IsSameEndpoint(a, b string) bool {
if a == b {
return true
}
ua, err := url.Parse(a)
if err != nil {
return false
}
ub, err := url.Parse(b)
if err != nil {
return false
}
return ua.Hostname() == ub.Hostname()
}

View file

@ -25,5 +25,32 @@ package arangod
import "github.com/pkg/errors"
var (
KeyNotFoundError = errors.New("Key not found")
maskAny = errors.WithStack
)
// IsKeyNotFound returns true if the given error is (or is caused by) a KeyNotFoundError.
func IsKeyNotFound(err error) bool {
return errors.Cause(err) == KeyNotFoundError
}
// NotLeaderError indicates the response of an agent when it is
// not the leader of the agency.
type NotLeaderError struct {
Leader string // Endpoint of the current leader
}
// Error implements error.
func (e NotLeaderError) Error() string {
return "not the leader"
}
// IsNotLeader returns true if the given error is (or is caused by) a NotLeaderError.
func IsNotLeader(err error) (string, bool) {
nlErr, ok := err.(NotLeaderError)
if ok {
return nlErr.Leader, true
}
return "", false
}