1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Unify agency access (#1024)

This commit is contained in:
Adam Janikowski 2022-06-20 16:43:23 +02:00 committed by GitHub
parent 8891432b72
commit a9d7849169
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
31 changed files with 7497 additions and 455 deletions

View file

@ -20,6 +20,7 @@
- (Feature) Add `ArangoLocalStorage` CRD auto-installer
- (Feature) Add `ArangoDeploymentReplication` CRD auto-installer
- (Bugfix) Allow missing `token` key in License secret
- (Feature) Unify agency access
## [1.2.13](https://github.com/arangodb/kube-arangodb/tree/1.2.13) (2022-06-07)
- (Bugfix) Fix arangosync members state inspection

View file

@ -22,57 +22,85 @@ package agency
import (
"context"
"fmt"
"sync"
"time"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
)
type health struct {
leaderID string
agencySize int
names []string
commitIndexes map[string]uint64
leaders map[string]string
election map[string]int
}
func (h health) LeaderID() string {
return h.leaderID
}
// IsHealthy returns true if all agencies have the same commit index.
// Returns false when:
// - agencies' list is empty.
// - agencies have different commit indices.
// - agencies have commit indices == 0.
func (h health) IsHealthy() bool {
var globalCommitIndex uint64
first := true
// Healthy returns nil if all agencies have the same commit index.
func (h health) Healthy() error {
if err := h.Serving(); err != nil {
return err
}
for _, commitIndex := range h.commitIndexes {
if first {
globalCommitIndex = commitIndex
first = false
} else if commitIndex != globalCommitIndex {
return false
if h.election[h.leaderID] != h.agencySize {
return errors.Newf("Not all agents are in quorum")
}
index := h.commitIndexes[h.leaderID]
if index == 0 {
return errors.Newf("Agency CommitIndex is zero")
}
for k, v := range h.commitIndexes {
if v != index {
return errors.Newf("Agent %s is behind in CommitIndex", k)
}
}
return globalCommitIndex != 0
return nil
}
func (h health) Serving() error {
if h.agencySize == 0 {
return errors.Newf("Empty agents list")
}
if len(h.election) == 0 {
return errors.Newf("No Leader")
} else if len(h.election) > 1 {
return errors.Newf("Multiple leaders")
}
if len(h.leaders) <= h.agencySize/2 {
return errors.Newf("Quorum is not present")
}
return nil
}
// Health describes interface to check healthy of the environment.
type Health interface {
// IsHealthy return true when environment is considered as healthy.
IsHealthy() bool
// Healthy return nil when environment is considered as healthy.
Healthy() error
// Serving return nil when environment is considered as responsive, but not fully healthy.
Serving() error
// LeaderID returns a leader ID or empty string if a leader is not known.
LeaderID() string
}
type Cache interface {
Reload(ctx context.Context, clients []agency.Agency) (uint64, error)
Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error)
Data() (State, bool)
CommitIndex() uint64
// Health returns true when healthy object is available.
@ -107,7 +135,7 @@ func (c cacheSingle) Health() (Health, bool) {
return nil, false
}
func (c cacheSingle) Reload(_ context.Context, _ []agency.Agency) (uint64, error) {
func (c cacheSingle) Reload(_ context.Context, _ int, _ []agency.Agency) (uint64, error) {
return 0, nil
}
@ -153,15 +181,16 @@ func (c *cache) Health() (Health, bool) {
return nil, false
}
func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, error) {
func (c *cache) Reload(ctx context.Context, size int, clients []agency.Agency) (uint64, error) {
c.lock.Lock()
defer c.lock.Unlock()
leaderCli, leaderConfig, health, err := getLeader(ctx, clients)
leaderCli, leaderConfig, health, err := getLeader(ctx, size, clients)
if err != nil {
// Invalidate a leader ID and agency state.
// In the next iteration leaderID will be sat because `valid` will be false.
c.valid = false
c.health = nil
return 0, err
}
@ -186,91 +215,62 @@ func (c *cache) Reload(ctx context.Context, clients []agency.Agency) (uint64, er
// getLeader returns config and client to a leader agency, and health to check if agencies are on the same page.
// If there is no quorum for the leader then error is returned.
func getLeader(ctx context.Context, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
var mutex sync.Mutex
var anyError error
func getLeader(ctx context.Context, size int, clients []agency.Agency) (agency.Agency, *Config, Health, error) {
configs := make([]*Config, len(clients))
errs := make([]error, len(clients))
var wg sync.WaitGroup
cliLen := len(clients)
if cliLen == 0 {
return nil, nil, nil, errors.New("empty list of agencies' clients")
}
configs := make([]*Config, cliLen)
leaders := make(map[string]int, cliLen)
var h health
h.commitIndexes = make(map[string]uint64, cliLen)
// Fetch all configs from agencies.
wg.Add(cliLen)
for i, cli := range clients {
go func(iLocal int, cliLocal agency.Agency) {
// Fetch Agency config
for i := range clients {
wg.Add(1)
go func(id int) {
defer wg.Done()
ctxLocal, cancel := context.WithTimeout(ctx, time.Second)
ctxLocal, cancel := globals.GetGlobals().Timeouts().Agency().WithTimeout(ctx)
defer cancel()
config, err := GetAgencyConfig(ctxLocal, cliLocal)
mutex.Lock()
defer mutex.Unlock()
config, err := GetAgencyConfig(ctxLocal, clients[id])
if err != nil {
anyError = err
return
} else if config == nil || config.LeaderId == "" {
anyError = fmt.Errorf("leader unknown for the agent %v", cliLocal.Connection().Endpoints())
errs[id] = err
return
}
// Write config on the same index where client is (It will be helpful later).
configs[iLocal] = config
// Count leaders.
leaders[config.LeaderId]++
h.commitIndexes[config.Configuration.ID] = config.CommitIndex
}(i, cli)
configs[id] = config
}(i)
}
wg.Wait()
if anyError != nil {
return nil, nil, nil, wrapError(anyError, "not all agencies are responsive")
}
var h health
h.agencySize = size
h.names = make([]string, len(clients))
h.commitIndexes = make(map[string]uint64, len(clients))
h.leaders = make(map[string]string, len(clients))
h.election = make(map[string]int, len(clients))
if len(leaders) == 0 {
return nil, nil, nil, wrapError(anyError, "failed to get config from agencies")
}
// Find the leader ID which has the most votes from all agencies.
maxVotes := 0
var leaderID string
for id, votes := range leaders {
if votes > maxVotes {
maxVotes = votes
leaderID = id
for id := range configs {
if config := configs[id]; config != nil {
name := config.Configuration.ID
h.names[id] = name
h.commitIndexes[name] = config.CommitIndex
if config.LeaderId != "" {
h.leaders[name] = config.LeaderId
h.election[config.LeaderId]++
h.leaderID = config.LeaderId
}
}
}
h.leaderID = leaderID
// Check if a leader has quorum from all possible agencies.
if maxVotes <= cliLen/2 {
message := fmt.Sprintf("no quorum for leader %s, votes %d of %d", leaderID, maxVotes, cliLen)
return nil, nil, nil, wrapError(anyError, message)
if err := h.Serving(); err != nil {
return nil, nil, nil, err
}
// From here on, a leader with quorum is known.
for i, config := range configs {
if config != nil && config.Configuration.ID == leaderID {
return clients[i], config, h, nil
for id := range clients {
if h.leaderID == h.names[id] {
return clients[id], configs[id], h, nil
}
}
return nil, nil, nil, wrapError(anyError, "the leader is not responsive")
}
func wrapError(err error, message string) error {
if err != nil {
return errors.WithMessage(err, message)
}
return errors.New(message)
return nil, nil, nil, errors.Newf("Unable to find agent")
}

View file

@ -27,5 +27,5 @@ type StateCurrentDBCollections map[string]StateCurrentDBCollection
type StateCurrentDBCollection map[string]StateCurrentDBShard
type StateCurrentDBShard struct {
Servers ShardServers `json:"servers,omitempty"`
Servers Servers `json:"servers,omitempty"`
}

View file

@ -38,6 +38,13 @@ const (
SupervisionKey = "Supervision"
SupervisionMaintenanceKey = "Maintenance"
TargetJobToDoKey = "ToDo"
TargetJobPendingKey = "Pending"
TargetJobFailedKey = "Failed"
TargetJobFinishedKey = "Finished"
TargetCleanedServersKey = "CleanedServers"
)
func GetAgencyKey(parts ...string) string {

View file

@ -0,0 +1,108 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
import (
"fmt"
"math/rand"
"testing"
)
func NewJobsGenerator() JobsGeneratorInterface {
return &jobsGenerator{
jobs: map[JobPhase]map[JobID]Job{},
}
}
type jobsGenerator struct {
id int
jobs map[JobPhase]map[JobID]Job
}
func (j *jobsGenerator) Jobs(phase JobPhase, jobs int, jobTypes ...string) JobsGeneratorInterface {
if len(jobTypes) == 0 {
jobTypes = []string{"moveShard"}
}
z := j.jobs[phase]
if z == nil {
z = map[JobID]Job{}
}
for i := 0; i < jobs; i++ {
q := j.id
j.id++
id := fmt.Sprintf("s%07d", q)
z[JobID(id)] = Job{
Type: jobTypes[rand.Intn(len(jobTypes))],
}
}
j.jobs[phase] = z
return j
}
func (j *jobsGenerator) Add() StateGenerator {
return func(t *testing.T, s *State) {
if m := j.jobs[JobPhaseToDo]; len(m) > 0 {
if s.Target.JobToDo == nil {
s.Target.JobToDo = map[JobID]Job{}
}
for k, v := range m {
s.Target.JobToDo[k] = v
}
}
if m := j.jobs[JobPhasePending]; len(m) > 0 {
if s.Target.JobPending == nil {
s.Target.JobPending = map[JobID]Job{}
}
for k, v := range m {
s.Target.JobPending[k] = v
}
}
if m := j.jobs[JobPhaseFailed]; len(m) > 0 {
if s.Target.JobFailed == nil {
s.Target.JobFailed = map[JobID]Job{}
}
for k, v := range m {
s.Target.JobFailed[k] = v
}
}
if m := j.jobs[JobPhaseFinished]; len(m) > 0 {
if s.Target.JobFinished == nil {
s.Target.JobFinished = map[JobID]Job{}
}
for k, v := range m {
s.Target.JobFinished[k] = v
}
}
}
}
type JobsGeneratorInterface interface {
Jobs(phase JobPhase, jobs int, jobTypes ...string) JobsGeneratorInterface
Add() StateGenerator
}

View file

@ -21,24 +21,24 @@
package agency
type ShardGeneratorInterface interface {
WithPlan(servers ...string) ShardGeneratorInterface
WithCurrent(servers ...string) ShardGeneratorInterface
WithPlan(servers ...Server) ShardGeneratorInterface
WithCurrent(servers ...Server) ShardGeneratorInterface
Add() CollectionGeneratorInterface
}
type shardGenerator struct {
col collectionGenerator
plan []string
current []string
plan Servers
current Servers
}
func (s shardGenerator) WithPlan(servers ...string) ShardGeneratorInterface {
func (s shardGenerator) WithPlan(servers ...Server) ShardGeneratorInterface {
s.plan = servers
return s
}
func (s shardGenerator) WithCurrent(servers ...string) ShardGeneratorInterface {
func (s shardGenerator) WithCurrent(servers ...Server) ShardGeneratorInterface {
s.current = servers
return s
}

View file

@ -0,0 +1,40 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
type JobPhase string
var (
JobPhaseUnknown JobPhase = ""
JobPhaseToDo JobPhase = "ToDo"
JobPhasePending JobPhase = "Pending"
JobPhaseFailed JobPhase = "Failed"
JobPhaseFinished JobPhase = "Finished"
)
type JobID string
type Jobs map[JobID]Job
type Job struct {
Type string `json:"type,omitempty"`
Reason string `json:"reason,omitempty"`
}

View file

@ -0,0 +1,80 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
import (
"fmt"
"math/rand"
"testing"
"github.com/stretchr/testify/require"
)
func caseJobPerformance(t *testing.T, jobs int) {
j := NewJobsGenerator()
currentJobs := jobs
for _, p := range []JobPhase{
JobPhaseToDo,
JobPhasePending,
JobPhaseFinished,
} {
z := rand.Intn(currentJobs + 1)
j = j.Jobs(p, z)
currentJobs -= z
}
j = j.Jobs(JobPhaseFailed, currentJobs)
gen := j.Add()
t.Run(fmt.Sprintf("Jobs %d", jobs), func(t *testing.T) {
var s State
var jids []JobID
runWithMeasure(t, "Generate", func(t *testing.T) {
s = GenerateState(t, gen)
})
runWithMeasure(t, "Count", func(t *testing.T) {
jids = s.Target.GetJobIDs()
i := len(jids)
t.Logf("Count %d", i)
require.Equal(t, jobs, i)
})
runCountWithMeasure(t, 16, "Lookup", func(t *testing.T) {
id := jids[rand.Intn(len(jids))]
_, z := s.Target.GetJob(id)
require.NotEqual(t, JobPhaseUnknown, z)
})
})
}
func TestJobPerformance(t *testing.T) {
caseJobPerformance(t, 16)
caseJobPerformance(t, 256)
caseJobPerformance(t, 1024)
caseJobPerformance(t, 2048)
caseJobPerformance(t, 2048*16)
}

View file

@ -22,7 +22,7 @@ package agency
type StatePlanCollections map[string]StatePlanDBCollections
func (a StatePlanCollections) IsDBServerInDatabases(name string) bool {
func (a StatePlanCollections) IsDBServerInDatabases(name Server) bool {
for _, collections := range a {
if collections.IsDBServerInCollections(name) {
return true
@ -33,7 +33,7 @@ func (a StatePlanCollections) IsDBServerInDatabases(name string) bool {
type StatePlanDBCollections map[string]StatePlanCollection
func (a StatePlanDBCollections) IsDBServerInCollections(name string) bool {
func (a StatePlanDBCollections) IsDBServerInCollections(name Server) bool {
for _, collection := range a {
if collection.IsDBServerInShards(name) {
return true
@ -108,7 +108,7 @@ func (a StatePlanCollection) GetName(d string) string {
return *a.Name
}
func (a *StatePlanCollection) IsDBServerInShards(name string) bool {
func (a *StatePlanCollection) IsDBServerInShards(name Server) bool {
if a == nil {
return false
}

View file

@ -0,0 +1,47 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package agency
type Server string
type Servers []Server
func (s Servers) Contains(id Server) bool {
for _, q := range s {
if q == id {
return true
}
}
return false
}
func (s Servers) Join(ids Servers) Servers {
r := make(Servers, 0, len(s))
for _, id := range ids {
if s.Contains(id) {
r = append(r, id)
}
}
return r
}

View file

@ -20,28 +20,4 @@
package agency
type Shards map[string]ShardServers
type ShardServers []string
func (s ShardServers) Contains(server string) bool {
for _, q := range s {
if server == q {
return true
}
}
return false
}
func (s ShardServers) FilterBy(b ShardServers) ShardServers {
q := make(ShardServers, 0, len(s))
for _, i := range s {
if b.Contains(i) {
q = append(q, i)
}
}
return q
}
type Shards map[string]Servers

View file

@ -45,6 +45,11 @@ func loadState(ctx context.Context, client agency.Agency) (State, error) {
GetAgencyKey(ArangoKey, PlanKey, PlanCollectionsKey),
GetAgencyKey(ArangoKey, CurrentKey, PlanCollectionsKey),
GetAgencyKey(ArangoKey, TargetKey, TargetHotBackupKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobToDoKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobPendingKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobFailedKey),
GetAgencyKey(ArangoKey, TargetKey, TargetJobFinishedKey),
GetAgencyKey(ArangoKey, TargetKey, TargetCleanedServersKey),
}
req, err = req.SetBody(GetAgencyReadRequest(GetAgencyReadKey(readKeys...)))
@ -123,8 +128,8 @@ func (s State) CountShards() int {
return count
}
func (s State) PlanServers() []string {
q := map[string]bool{}
func (s State) PlanServers() Servers {
q := map[Server]bool{}
for _, db := range s.Plan.Collections {
for _, col := range db {
@ -136,7 +141,7 @@ func (s State) PlanServers() []string {
}
}
r := make([]string, 0, len(q))
r := make([]Server, 0, len(q))
for k := range q {
r = append(r, k)
@ -187,11 +192,11 @@ func (s State) Filter(f StateShardFilter) CollectionShardDetails {
return shards[0:size]
}
func GetDBServerBlockingRestartShards(s State, serverID string) CollectionShardDetails {
func GetDBServerBlockingRestartShards(s State, serverID Server) CollectionShardDetails {
return s.Filter(FilterDBServerShardRestart(serverID))
}
func FilterDBServerShardRestart(serverID string) StateShardFilter {
func FilterDBServerShardRestart(serverID Server) StateShardFilter {
return NegateFilter(func(s State, db, col, shard string) bool {
// Filter all shards which are not blocking restart of server
plan := s.Plan.Collections[db][col]
@ -203,7 +208,7 @@ func FilterDBServerShardRestart(serverID string) StateShardFilter {
}
current := s.Current.Collections[db][col][shard]
currentShard := current.Servers.FilterBy(planShard)
currentShard := current.Servers.Join(planShard)
serverInSync := currentShard.Contains(serverID)
@ -246,11 +251,11 @@ func FilterDBServerShardRestart(serverID string) StateShardFilter {
})
}
func GetDBServerShardsNotInSync(s State, serverID string) CollectionShardDetails {
func GetDBServerShardsNotInSync(s State, serverID Server) CollectionShardDetails {
return s.Filter(FilterDBServerShardsNotInSync(serverID))
}
func FilterDBServerShardsNotInSync(serverID string) StateShardFilter {
func FilterDBServerShardsNotInSync(serverID Server) StateShardFilter {
return NegateFilter(func(s State, db, col, shard string) bool {
planShard := s.Plan.Collections[db][col].Shards[shard]

View file

@ -29,6 +29,20 @@ import (
"github.com/stretchr/testify/require"
)
func runCountWithMeasure(t *testing.T, c int, name string, f func(t *testing.T)) {
t.Run(name, func(t *testing.T) {
n := time.Now()
defer func() {
s := time.Since(n)
t.Logf("Elapsed: %s - %s per item", s.String(), s/time.Duration(c))
}()
for i := 0; i < c; i++ {
runWithMeasure(t, fmt.Sprintf("R%03d", i), f)
}
})
}
func runWithMeasure(t *testing.T, name string, f func(t *testing.T)) {
t.Run(name, func(t *testing.T) {
n := time.Now()
@ -63,7 +77,7 @@ func perfWithSize(t *testing.T, dbs, collections, shards, rf, servers int) {
for id := 0; id < servers; id++ {
name := fmt.Sprintf("server-%d", id)
runWithMeasure(t, name, func(t *testing.T) {
require.Len(t, GetDBServerBlockingRestartShards(s, name), 0)
require.Len(t, GetDBServerBlockingRestartShards(s, Server(name)), 0)
})
}
})
@ -72,7 +86,7 @@ func perfWithSize(t *testing.T, dbs, collections, shards, rf, servers int) {
for id := 0; id < servers; id++ {
name := fmt.Sprintf("server-%d", id)
runWithMeasure(t, name, func(t *testing.T) {
require.Len(t, GetDBServerShardsNotInSync(s, name), 0)
require.Len(t, GetDBServerShardsNotInSync(s, Server(name)), 0)
})
}
})
@ -127,7 +141,7 @@ func generateShards(t *testing.T, col CollectionGeneratorInterface, shards, rf,
return c
}
func getServersSublist(t *testing.T, rf, servers int) ShardServers {
func getServersSublist(t *testing.T, rf, servers int) Servers {
require.NotEqual(t, 0, rf)
if rf > servers {
require.Fail(t, "Server count is smaller than rf")
@ -136,11 +150,11 @@ func getServersSublist(t *testing.T, rf, servers int) ShardServers {
return generateServersSublist(servers)[0:rf]
}
func generateServersSublist(servers int) ShardServers {
s := make(ShardServers, servers)
func generateServersSublist(servers int) Servers {
s := make(Servers, servers)
for id := range s {
s[id] = fmt.Sprintf("server-%d", id)
s[id] = Server(fmt.Sprintf("server-%d", id))
}
rand.Shuffle(len(s), func(i, j int) {

File diff suppressed because one or more lines are too long

View file

@ -21,9 +21,61 @@
package agency
type StateTarget struct {
// Jobs Section
JobToDo Jobs `json:"ToDo,omitempty"`
JobPending Jobs `json:"Pending,omitempty"`
JobFailed Jobs `json:"Failed,omitempty"`
JobFinished Jobs `json:"Finished,omitempty"`
// Servers Section
CleanedServers Servers `json:"CleanedServers,omitempty"`
// HotBackup section
HotBackup StateTargetHotBackup `json:"HotBackup,omitempty"`
}
func (s StateTarget) GetJob(id JobID) (Job, JobPhase) {
if v, ok := s.JobToDo[id]; ok {
return v, JobPhaseToDo
}
if v, ok := s.JobPending[id]; ok {
return v, JobPhasePending
}
if v, ok := s.JobFailed[id]; ok {
return v, JobPhaseFailed
}
if v, ok := s.JobFinished[id]; ok {
return v, JobPhaseFinished
}
return Job{}, JobPhaseUnknown
}
func (s StateTarget) GetJobIDs() []JobID {
r := make([]JobID, 0, len(s.JobToDo)+len(s.JobPending)+len(s.JobFinished)+len(s.JobFailed))
for k := range s.JobToDo {
r = append(r, k)
}
for k := range s.JobPending {
r = append(r, k)
}
for k := range s.JobFinished {
r = append(r, k)
}
for k := range s.JobFailed {
r = append(r, k)
}
return r
}
type StateTargetHotBackup struct {
Create StateTimestamp `json:"Create,omitempty"`
}

File diff suppressed because it is too large Load diff

View file

@ -163,20 +163,26 @@ func (d *Deployment) RefreshAgencyCache(ctx context.Context) (uint64, error) {
return 0, nil
}
lCtx, c := globals.GetGlobalTimeouts().Agency().WithTimeout(ctx)
defer c()
if info := d.apiObject.Status.Agency; info != nil {
if size := info.Size; size != nil {
lCtx, c := globals.GetGlobalTimeouts().Agency().WithTimeout(ctx)
defer c()
var clients []agencydriver.Agency
for _, m := range d.GetStatusSnapshot().Members.Agents {
a, err := d.GetAgency(lCtx, m.ID)
if err != nil {
return 0, err
var clients []agencydriver.Agency
for _, m := range d.GetStatusSnapshot().Members.Agents {
a, err := d.GetAgency(lCtx, m.ID)
if err != nil {
return 0, err
}
clients = append(clients, a)
}
return d.agencyCache.Reload(lCtx, int(*size), clients)
}
clients = append(clients, a)
}
return d.agencyCache.Reload(lCtx, clients)
return 0, errors.Newf("Agency not yet established")
}
func (d *Deployment) SetAgencyMaintenanceMode(ctx context.Context, enabled bool) error {

View file

@ -120,7 +120,7 @@ func (i *inventory) Collect(m chan<- prometheus.Metric) {
db,
name,
shard,
server,
string(server),
}
if id == 0 {

View file

@ -29,7 +29,7 @@ import (
driver "github.com/arangodb/go-driver"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
)
func init() {
@ -118,58 +118,19 @@ func (a *actionCleanoutMember) CheckProgress(ctx context.Context) (bool, bool, e
return true, false, nil
}
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := a.actionCtx.GetDatabaseClient(ctxChild)
if err != nil {
a.log.Err(err).Debug("Failed to create database client")
cache, ok := a.actionCtx.GetAgencyCache()
if !ok {
a.log.Debug("AgencyCache is not ready")
return false, false, nil
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
cluster, err := c.Cluster(ctxChild)
if err != nil {
a.log.Err(err).Debug("Failed to access cluster")
return false, false, nil
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
cleanedOut, err := cluster.IsCleanedOut(ctxChild, a.action.MemberID)
if err != nil {
a.log.Err(err).Debug("IsCleanedOut failed")
return false, false, nil
}
if !cleanedOut {
if !cache.Target.CleanedServers.Contains(agency.Server(a.action.MemberID)) {
// We're not done yet, check job status
a.log.Debug("IsCleanedOut returned false")
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := a.actionCtx.GetDatabaseClient(ctxChild)
if err != nil {
a.log.Err(err).Debug("Failed to create database client")
return false, false, nil
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
agency, err := a.actionCtx.GetAgency(ctxChild)
if err != nil {
a.log.Err(err).Debug("Failed to create agency client")
return false, false, nil
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
jobStatus, err := arangod.CleanoutServerJobStatus(ctxChild, m.CleanoutJobID, c, agency)
if err != nil {
a.log.Err(err).Debug("Failed to fetch cleanout job status")
return false, false, nil
}
if jobStatus.IsFailed() {
a.log.Str("reason", jobStatus.Reason()).Warn("Cleanout Job failed. Aborting plan")
details, jobStatus := cache.Target.GetJob(agency.JobID(m.CleanoutJobID))
if jobStatus == agency.JobPhaseFailed {
a.log.Str("reason", details.Reason).Warn("Cleanout Job failed. Aborting plan")
// Revert cleanout state
m.Phase = api.MemberPhaseCreated
m.CleanoutJobID = ""

View file

@ -25,7 +25,6 @@ import (
"time"
"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/go-driver/agency"
core "k8s.io/api/core/v1"
"github.com/arangodb/go-driver"
@ -162,10 +161,6 @@ func (ac *actionContext) GetNamespace() string {
return ac.context.GetNamespace()
}
func (ac *actionContext) GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error) {
return ac.context.GetAgencyClientsWithPredicate(ctx, predicate)
}
func (ac *actionContext) GetStatus() (api.DeploymentStatus, int32) {
return ac.context.GetStatus()
}
@ -272,24 +267,6 @@ func (ac *actionContext) GetServerClient(ctx context.Context, group api.ServerGr
return c, nil
}
// GetAgencyClients returns a client connection for every agency member.
func (ac *actionContext) GetAgencyClients(ctx context.Context) ([]driver.Connection, error) {
c, err := ac.context.GetAgencyClients(ctx)
if err != nil {
return nil, errors.WithStack(err)
}
return c, nil
}
// GetAgency returns a connection to the agency.
func (ac *actionContext) GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error) {
a, err := ac.context.GetAgency(ctx, agencyIDs...)
if err != nil {
return nil, errors.WithStack(err)
}
return a, nil
}
// GetSyncServerClient returns a cached client for a specific arangosync server.
func (ac *actionContext) GetSyncServerClient(ctx context.Context, group api.ServerGroup, id string) (client.API, error) {
c, err := ac.context.GetSyncServerClient(ctx, group, id)

View file

@ -26,10 +26,10 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/go-driver"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
)
func init() {
@ -122,7 +122,8 @@ func (a *actionResignLeadership) CheckProgress(ctx context.Context) (bool, bool,
return true, false, nil
}
if agencyState, agencyOK := a.actionCtx.GetAgencyCache(); !agencyOK {
agencyState, agencyOK := a.actionCtx.GetAgencyCache()
if !agencyOK {
a.log.Error("Unable to get maintenance mode")
return false, false, nil
} else if agencyState.Supervision.Maintenance.Exists() {
@ -135,50 +136,24 @@ func (a *actionResignLeadership) CheckProgress(ctx context.Context) (bool, bool,
return true, false, nil
}
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
agency, err := a.actionCtx.GetAgency(ctxChild)
if err != nil {
a.log.Err(err).Debug("Failed to create agency client")
return false, false, nil
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
c, err := a.actionCtx.GetDatabaseClient(ctxChild)
if err != nil {
a.log.Err(err).Debug("Failed to create member client")
return false, false, nil
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
jobStatus, err := arangod.CleanoutServerJobStatus(ctxChild, m.CleanoutJobID, c, agency)
if err != nil {
if driver.IsNotFound(err) {
a.log.Err(err).Debug("Job not found, but proceeding")
return true, false, nil
}
a.log.Err(err).Debug("Failed to fetch job status")
return false, false, errors.WithStack(err)
}
if jobStatus.IsFailed() {
_, jobStatus := agencyState.Target.GetJob(agency.JobID(m.CleanoutJobID))
switch jobStatus {
case agency.JobPhaseFailed:
m.CleanoutJobID = ""
if err := a.actionCtx.UpdateMember(ctx, m); err != nil {
return false, false, errors.WithStack(err)
}
a.log.Error("Resign server job failed")
return true, false, nil
}
if jobStatus.IsFinished() {
case agency.JobPhaseFinished:
m.CleanoutJobID = ""
if err := a.actionCtx.UpdateMember(ctx, m); err != nil {
return false, false, errors.WithStack(err)
}
return true, false, nil
case agency.JobPhaseUnknown:
a.log.Debug("Job not found, but proceeding")
return true, false, nil
}
return false, false, nil
}

View file

@ -100,7 +100,7 @@ func (a *actionWaitForMemberInSync) checkCluster() (bool, error) {
return false, nil
}
notInSyncShards := agency.GetDBServerShardsNotInSync(agencyState, a.MemberID())
notInSyncShards := agency.GetDBServerShardsNotInSync(agencyState, agency.Server(a.MemberID()))
if len(notInSyncShards) > 0 {
a.log.Str("mode", "cluster").Str("member", a.MemberID()).Int("shard", len(notInSyncShards)).Info("DBServer contains not in sync shards")

View file

@ -22,15 +22,12 @@ package reconcile
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
)
@ -85,12 +82,12 @@ func (a *actionWaitForMemberUp) CheckProgress(ctx context.Context) (bool, bool,
return a.checkProgressSingle(ctxChild)
case api.DeploymentModeActiveFailover:
if a.action.Group == api.ServerGroupAgents {
return a.checkProgressAgent(ctxChild)
return a.checkProgressAgent()
}
return a.checkProgressSingleInActiveFailover(ctxChild)
default:
if a.action.Group == api.ServerGroupAgents {
return a.checkProgressAgent(ctxChild)
return a.checkProgressAgent()
}
return a.checkProgressCluster()
}
@ -128,23 +125,13 @@ func (a *actionWaitForMemberUp) checkProgressSingleInActiveFailover(ctx context.
// checkProgressAgent checks the progress of the action in the case
// of an agent.
func (a *actionWaitForMemberUp) checkProgressAgent(ctx context.Context) (bool, bool, error) {
clients, err := a.actionCtx.GetAgencyClients(ctx)
if err != nil {
a.log.Err(err).Debug("Failed to create agency clients")
func (a *actionWaitForMemberUp) checkProgressAgent() (bool, bool, error) {
agencyHealth, ok := a.actionCtx.GetAgencyHealth()
if !ok {
a.log.Debug("Agency health fetch failed")
return false, false, nil
}
for _, a := range clients {
a.Endpoints()
}
shortCtx, c := context.WithTimeout(ctx, 3*time.Second)
defer c()
shortCtx = agency.WithAllowDifferentLeaderEndpoints(shortCtx)
if err := agency.AreAgentsHealthy(shortCtx, clients); err != nil {
if err := agencyHealth.Healthy(); err != nil {
a.log.Err(err).Debug("Not all agents are ready")
return false, false, nil
}

View file

@ -25,6 +25,7 @@ import (
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/actions"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
)
@ -120,7 +121,7 @@ func (r *Reconciler) createMemberFailedRestorePlan(ctx context.Context, apiObjec
continue
}
if agencyState.Plan.Collections.IsDBServerInDatabases(m.ID) {
if agencyState.Plan.Collections.IsDBServerInDatabases(agency.Server(m.ID)) {
// DBServer still exists in agency plan! Will not be removed, but needs to be recreated
memberLog.Info("Recreating DBServer - it cannot be removed gracefully")
plan = append(plan,

View file

@ -472,7 +472,7 @@ func groupReadyForRestart(context PlanBuilderContext, status api.DeploymentStatu
return false, "Unable to get agency cache"
}
blockingRestartShards := agency.GetDBServerBlockingRestartShards(agencyState, member.ID)
blockingRestartShards := agency.GetDBServerBlockingRestartShards(agencyState, agency.Server(member.ID))
if s := len(blockingRestartShards); s > 0 {
return false, fmt.Sprintf("There are %d shards which are blocking restart", s)

View file

@ -27,7 +27,6 @@ import (
"github.com/arangodb/arangosync-client/client"
"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/deployment/acs/sutil"
agencyCache "github.com/arangodb/kube-arangodb/pkg/deployment/agency"
@ -124,15 +123,6 @@ type ArangoApplier interface {
ApplyPatch(ctx context.Context, p ...patch.Item) error
}
type DeploymentAgencyClient interface {
// GetAgencyClients returns a client connection for every agency member.
GetAgencyClients(ctx context.Context) ([]driver.Connection, error)
// GetAgencyClientsWithPredicate returns a client connection for every agency member which match condition.
GetAgencyClientsWithPredicate(ctx context.Context, predicate func(id string) bool) ([]driver.Connection, error)
// GetAgency returns a connection to the entire agency.
GetAgency(ctx context.Context, agencyIDs ...string) (agency.Agency, error)
}
type DeploymentDatabaseClient interface {
// GetDatabaseClient returns a cached client for the entire database (cluster coordinators or single server),
// creating one if needed.
@ -160,7 +150,6 @@ type KubernetesEventGenerator interface {
}
type DeploymentClient interface {
DeploymentAgencyClient
DeploymentDatabaseClient
DeploymentMemberClient
DeploymentSyncClient

View file

@ -30,7 +30,8 @@ import (
// Context provides methods to the resilience package.
type Context interface {
reconciler.DeploymentDatabaseClient
reconciler.DeploymentAgencyClient
reconciler.ArangoAgency
// GetSpec returns the current specification of the deployment
GetSpec() api.DeploymentSpec
// GetStatus returns the current status of the deployment

View file

@ -28,7 +28,6 @@ import (
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
)
@ -129,16 +128,15 @@ func (r *Resilience) isMemberFailureAcceptable(ctx context.Context, group api.Se
switch group {
case api.ServerGroupAgents:
// All good when remaining agents are health
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
clients, err := r.context.GetAgencyClientsWithPredicate(ctxChild, func(id string) bool { return id != m.ID })
if err != nil {
return false, "", errors.WithStack(err)
agencyHealth, ok := r.context.GetAgencyHealth()
if !ok {
return false, "AgencyHealth is not present", nil
}
if err := agency.AreAgentsHealthy(ctx, clients); err != nil {
if err := agencyHealth.Healthy(); err != nil {
return false, err.Error(), nil
}
return true, "", nil
case api.ServerGroupDBServers:
ctxChild, cancel := globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)

View file

@ -132,7 +132,7 @@ func (r *Resources) runPodFinalizers(ctx context.Context, p *v1.Pod, memberStatu
// It returns nil if the finalizer can be removed.
func (r *Resources) inspectFinalizerPodAgencyServing(ctx context.Context, p *v1.Pod, memberStatus api.MemberStatus, updateMember func(api.MemberStatus) error) error {
log := r.log.Str("section", "agency")
if err := r.prepareAgencyPodTermination(ctx, p, memberStatus, func(update api.MemberStatus) error {
if err := r.prepareAgencyPodTermination(p, memberStatus, func(update api.MemberStatus) error {
if err := updateMember(update); err != nil {
return errors.WithStack(err)
}

View file

@ -22,7 +22,6 @@ package resources
import (
"context"
"time"
"github.com/arangodb/kube-arangodb/pkg/util/globals"
@ -31,10 +30,9 @@ import (
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
driver "github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
"github.com/arangodb/kube-arangodb/pkg/deployment/agency"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
v1 "k8s.io/api/core/v1"
)
@ -42,7 +40,7 @@ import (
// prepareAgencyPodTermination checks if the given agency pod is allowed to terminate
// and if so, prepares it for termination.
// It returns nil if the pod is allowed to terminate, an error otherwise.
func (r *Resources) prepareAgencyPodTermination(ctx context.Context, p *v1.Pod, memberStatus api.MemberStatus, updateMember func(api.MemberStatus) error) error {
func (r *Resources) prepareAgencyPodTermination(p *v1.Pod, memberStatus api.MemberStatus, updateMember func(api.MemberStatus) error) error {
log := r.log.Str("section", "pod")
// Inspect member phase
@ -71,12 +69,10 @@ func (r *Resources) prepareAgencyPodTermination(ctx context.Context, p *v1.Pod,
}
// Check PVC
ctxChild, cancel := globals.GetGlobalTimeouts().Kubernetes().WithTimeout(ctx)
defer cancel()
pvc, err := r.context.ACS().CurrentClusterCache().PersistentVolumeClaim().V1().Read().Get(ctxChild, memberStatus.PersistentVolumeClaimName, meta.GetOptions{})
if err != nil {
log.Err(err).Warn("Failed to get PVC for member")
return errors.WithStack(err)
pvc, ok := r.context.ACS().CurrentClusterCache().PersistentVolumeClaim().V1().GetSimple(memberStatus.PersistentVolumeClaimName)
if !ok {
log.Warn("Failed to get PVC for member")
return errors.Newf("Failed to get PVC for member")
}
if k8sutil.IsPersistentVolumeClaimMarkedForDeletion(pvc) {
agentDataWillBeGone = true
@ -90,24 +86,16 @@ func (r *Resources) prepareAgencyPodTermination(ctx context.Context, p *v1.Pod,
// Inspect agency state
log.Debug("Agent data will be gone, so we will check agency serving status first")
ctxChild, cancel = context.WithTimeout(ctx, time.Second*15)
defer cancel()
ctxLeader := agency.WithAllowNoLeader(ctxChild) // The ID we're checking may be the leader, so ignore situations where all other agents are followers
agencyConns, err := r.context.GetAgencyClientsWithPredicate(ctxLeader, func(id string) bool { return id != memberStatus.ID })
if err != nil {
log.Err(err).Debug("Failed to create member client")
return errors.WithStack(err)
agencyHealth, ok := r.context.GetAgencyHealth()
if !ok {
log.Debug("Agency health fetch failed")
return errors.Newf("Agency health fetch failed")
}
if len(agencyConns) == 0 {
log.Err(err).Debug("No more remaining agents, we cannot delete this one")
return errors.WithStack(errors.Newf("No more remaining agents"))
if err := agencyHealth.Healthy(); err != nil {
log.Err(err).Debug("Agency is not healthy. Cannot delete this one")
return errors.WithStack(errors.Newf("Agency is not healthy"))
}
if err := agency.AreAgentsHealthy(ctxLeader, agencyConns); err != nil {
log.Err(err).Debug("Remaining agents are not healthy")
return errors.WithStack(err)
}
// Complete agent recovery is needed, since data is already gone or not accessible
if memberStatus.Conditions.Update(api.ConditionTypeAgentRecoveryNeeded, true, "Data Gone", "") {
if err := updateMember(memberStatus); err != nil {
@ -265,22 +253,15 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, p *v1.Pod
}
} else if memberStatus.Phase == api.MemberPhaseDrain {
// Check the job progress
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
agency, err := r.context.GetAgency(ctxChild)
if err != nil {
log.Err(err).Debug("Failed to create agency client")
return errors.WithStack(err)
cache, ok := r.context.GetAgencyCache()
if !ok {
return errors.Newf("AgencyCache is not ready")
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
jobStatus, err := arangod.CleanoutServerJobStatus(ctxChild, memberStatus.CleanoutJobID, c, agency)
if err != nil {
log.Err(err).Debug("Failed to fetch job status")
return errors.WithStack(err)
}
if jobStatus.IsFailed() {
log.Str("reason", jobStatus.Reason()).Warn("Job failed")
details, jobStatus := cache.Target.GetJob(agency.JobID(memberStatus.CleanoutJobID))
switch jobStatus {
case agency.JobPhaseFailed:
log.Str("reason", details.Reason).Warn("Job failed")
// Revert cleanout state
memberStatus.Phase = api.MemberPhaseCreated
memberStatus.CleanoutJobID = ""
@ -289,30 +270,21 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, p *v1.Pod
}
log.Error("Cleanout/Resign server job failed, continue anyway")
return nil
}
if jobStatus.IsFinished() {
case agency.JobPhaseFinished:
memberStatus.CleanoutJobID = ""
memberStatus.Phase = api.MemberPhaseCreated
}
} else if memberStatus.Phase == api.MemberPhaseResign {
// Check the job progress
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
agency, err := r.context.GetAgency(ctxChild)
if err != nil {
log.Err(err).Debug("Failed to create agency client")
return errors.WithStack(err)
cache, ok := r.context.GetAgencyCache()
if !ok {
return errors.Newf("AgencyCache is not ready")
}
ctxChild, cancel = globals.GetGlobalTimeouts().ArangoD().WithTimeout(ctx)
defer cancel()
jobStatus, err := arangod.CleanoutServerJobStatus(ctxChild, memberStatus.CleanoutJobID, c, agency)
if err != nil {
log.Err(err).Debug("Failed to fetch job status")
return errors.WithStack(err)
}
if jobStatus.IsFailed() {
log.Str("reason", jobStatus.Reason()).Warn("Resign Job failed")
details, jobStatus := cache.Target.GetJob(agency.JobID(memberStatus.CleanoutJobID))
switch jobStatus {
case agency.JobPhaseFailed:
log.Str("reason", details.Reason).Warn("Resign Job failed")
// Revert cleanout state
memberStatus.Phase = api.MemberPhaseCreated
memberStatus.CleanoutJobID = ""
@ -321,9 +293,8 @@ func (r *Resources) prepareDBServerPodTermination(ctx context.Context, p *v1.Pod
}
log.Error("Cleanout/Resign server job failed, continue anyway")
return nil
}
if jobStatus.IsFinished() {
log.Str("reason", jobStatus.Reason()).Debug("Resign Job finished")
case agency.JobPhaseFinished:
log.Str("reason", details.Reason).Debug("Resign Job finished")
memberStatus.CleanoutJobID = ""
memberStatus.Phase = api.MemberPhaseCreated
if err := updateMember(memberStatus); err != nil {

View file

@ -1,95 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package arangod
import (
"context"
"fmt"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/go-driver"
"github.com/arangodb/go-driver/agency"
)
// CleanoutJobStatus is a strongly typed status of an agency cleanout-server-job.
type CleanoutJobStatus struct {
state string
reason string
}
// IsFailed returns true when the job is failed
func (s CleanoutJobStatus) IsFailed() bool {
return s.state == "Failed"
}
// IsFinished returns true when the job is finished
func (s CleanoutJobStatus) IsFinished() bool {
return s.state == "Finished"
}
// Reason returns the reason for the current state.
func (s CleanoutJobStatus) Reason() string {
return s.reason
}
// String returns a string representation of the given state.
func (s CleanoutJobStatus) String() string {
return fmt.Sprintf("state: '%s', reason: '%s'", s.state, s.reason)
}
var (
agencyJobStateKeyPrefixes = [][]string{
{"arango", "Target", "ToDo"},
{"arango", "Target", "Pending"},
{"arango", "Target", "Finished"},
{"arango", "Target", "Failed"},
}
)
type agencyJob struct {
Reason string `json:"reason,omitempty"`
Server string `json:"server,omitempty"`
JobID string `json:"jobId,omitempty"`
Type string `json:"type,omitempty"`
}
// CleanoutServerJobStatus checks the status of a cleanout-server job with given ID.
func CleanoutServerJobStatus(ctx context.Context, jobID string, client driver.Client, agencyClient agency.Agency) (CleanoutJobStatus, error) {
for _, keyPrefix := range agencyJobStateKeyPrefixes {
key := append(keyPrefix, jobID)
var job agencyJob
if err := agencyClient.ReadKey(ctx, key, &job); err == nil {
return CleanoutJobStatus{
state: keyPrefix[len(keyPrefix)-1],
reason: job.Reason,
}, nil
} else if agency.IsKeyNotFound(err) {
continue
} else {
return CleanoutJobStatus{}, errors.WithStack(err)
}
}
// Job not found in any states
return CleanoutJobStatus{
reason: "job not found",
}, nil
}