mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Fetch cluster health in go-routine
This commit is contained in:
parent
38b91d3b03
commit
56466c4631
5 changed files with 323 additions and 16 deletions
|
@ -141,6 +141,7 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoDeployment) (*De
|
|||
ci := newClusterScalingIntegration(d)
|
||||
d.clusterScalingIntegration = ci
|
||||
go ci.ListenForClusterEvents(d.stopCh)
|
||||
go d.resources.RunDeploymentHealthLoop(d.stopCh)
|
||||
}
|
||||
if config.AllowChaos {
|
||||
d.chaosMonkey = chaos.NewMonkey(deps.Log, d)
|
||||
|
|
89
pkg/deployment/resources/deployment_health.go
Normal file
89
pkg/deployment/resources/deployment_health.go
Normal file
|
@ -0,0 +1,89 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package resources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/arangodb/kube-arangodb/pkg/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
fetchDeploymentHealthCounters = metrics.MustRegisterCounterVec("deployment_resources", "fetchDeploymentHealth", "Number of times the health of the deployment was fetched", "deployment", "result")
|
||||
)
|
||||
|
||||
// RunDeploymentHealthLoop creates a loop to fetch the health of the deployment.
|
||||
// The loop ends when the given channel is closed.
|
||||
func (r *Resources) RunDeploymentHealthLoop(stopCh <-chan struct{}) {
|
||||
log := r.log
|
||||
deploymentName := r.context.GetAPIObject().GetName()
|
||||
|
||||
if r.context.GetSpec().GetMode() != api.DeploymentModeCluster {
|
||||
// Deployment health is currently only applicable for clusters
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if err := r.fetchDeploymentHealth(); err != nil {
|
||||
log.Debug().Err(err).Msg("Failed to fetch deployment health")
|
||||
fetchDeploymentHealthCounters.WithLabelValues(deploymentName, "failed").Inc()
|
||||
} else {
|
||||
fetchDeploymentHealthCounters.WithLabelValues(deploymentName, "success").Inc()
|
||||
}
|
||||
select {
|
||||
case <-time.After(time.Second * 5):
|
||||
// Continue
|
||||
case <-stopCh:
|
||||
// We're done
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// cleanupRemovedClusterMembers removes all arangod members that are no longer part of the cluster.
|
||||
func (r *Resources) fetchDeploymentHealth() error {
|
||||
// Ask cluster for its health
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
|
||||
defer cancel()
|
||||
client, err := r.context.GetDatabaseClient(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
c, err := client.Cluster(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
h, err := c.Health(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
|
||||
// Save cluster health
|
||||
r.health.mutex.Lock()
|
||||
defer r.health.mutex.Unlock()
|
||||
r.health.clusterHealth = h
|
||||
r.health.timestamp = time.Now()
|
||||
return nil
|
||||
}
|
|
@ -23,11 +23,11 @@
|
|||
package resources
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
driver "github.com/arangodb/go-driver"
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1alpha"
|
||||
"github.com/arangodb/kube-arangodb/pkg/metrics"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
)
|
||||
|
||||
|
@ -35,6 +35,11 @@ const (
|
|||
// minMemberAge is the minimum duration we expect a member to be created before we remove it because
|
||||
// it is not part of a deployment.
|
||||
minMemberAge = time.Minute * 10
|
||||
maxClusterHealthAge = time.Second * 20
|
||||
)
|
||||
|
||||
var (
|
||||
cleanupRemovedMembersCounters = metrics.MustRegisterCounterVec("deployment_resources", "cleanupRemovedMembers", "Number of cleanup-removed-members actions", "deployment", "result")
|
||||
)
|
||||
|
||||
// CleanupRemovedMembers removes all arangod members that are no longer part of ArangoDB deployment.
|
||||
|
@ -43,8 +48,10 @@ func (r *Resources) CleanupRemovedMembers() error {
|
|||
switch r.context.GetSpec().GetMode() {
|
||||
case api.DeploymentModeCluster:
|
||||
if err := r.cleanupRemovedClusterMembers(); err != nil {
|
||||
cleanupRemovedMembersCounters.WithLabelValues(r.context.GetAPIObject().GetName(), "failed").Inc()
|
||||
return maskAny(err)
|
||||
}
|
||||
cleanupRemovedMembersCounters.WithLabelValues(r.context.GetAPIObject().GetName(), "success").Inc()
|
||||
return nil
|
||||
default:
|
||||
// Other mode have no concept of cluster in which members can be removed
|
||||
|
@ -55,20 +62,16 @@ func (r *Resources) CleanupRemovedMembers() error {
|
|||
// cleanupRemovedClusterMembers removes all arangod members that are no longer part of the cluster.
|
||||
func (r *Resources) cleanupRemovedClusterMembers() error {
|
||||
log := r.log
|
||||
ctx := context.Background()
|
||||
|
||||
// Ask cluster for its health
|
||||
client, err := r.context.GetDatabaseClient(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
c, err := client.Cluster(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
h, err := c.Health(ctx)
|
||||
if err != nil {
|
||||
return maskAny(err)
|
||||
// Fetch recent cluster health
|
||||
r.health.mutex.Lock()
|
||||
h := r.health.clusterHealth
|
||||
ts := r.health.timestamp
|
||||
r.health.mutex.Unlock()
|
||||
|
||||
// Only accept recent cluster health values
|
||||
if time.Since(ts) > maxClusterHealthAge {
|
||||
return nil
|
||||
}
|
||||
|
||||
serverFound := func(id string) bool {
|
||||
|
|
|
@ -22,13 +22,24 @@
|
|||
|
||||
package resources
|
||||
|
||||
import "github.com/rs/zerolog"
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
driver "github.com/arangodb/go-driver"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
// Resources is a service that creates low level resources for members
|
||||
// and inspects low level resources, put the inspection result in members.
|
||||
type Resources struct {
|
||||
log zerolog.Logger
|
||||
context Context
|
||||
health struct {
|
||||
clusterHealth driver.ClusterHealth // Last fetched cluster health
|
||||
timestamp time.Time // Timestamp of last fetch of cluster health
|
||||
mutex sync.Mutex // Mutex guarding fields in this struct
|
||||
}
|
||||
}
|
||||
|
||||
// NewResources creates a new Resources service, used to
|
||||
|
|
203
pkg/util/errors/errors.go
Normal file
203
pkg/util/errors/errors.go
Normal file
|
@ -0,0 +1,203 @@
|
|||
//
|
||||
// DISCLAIMER
|
||||
//
|
||||
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Copyright holder is ArangoDB GmbH, Cologne, Germany
|
||||
//
|
||||
// Author Ewout Prangsma
|
||||
//
|
||||
|
||||
package errors
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"os"
|
||||
"syscall"
|
||||
|
||||
driver "github.com/arangodb/go-driver"
|
||||
errs "github.com/pkg/errors"
|
||||
)
|
||||
|
||||
var (
|
||||
Cause = errs.Cause
|
||||
New = errs.New
|
||||
WithStack = errs.WithStack
|
||||
Wrap = errs.Wrap
|
||||
Wrapf = errs.Wrapf
|
||||
)
|
||||
|
||||
// WithMessage annotates err with a new message.
|
||||
// The messages of given error is hidden.
|
||||
// If err is nil, WithMessage returns nil.
|
||||
func WithMessage(err error, message string) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
return &withMessage{
|
||||
cause: err,
|
||||
msg: message,
|
||||
}
|
||||
}
|
||||
|
||||
type withMessage struct {
|
||||
cause error
|
||||
msg string
|
||||
}
|
||||
|
||||
func (w *withMessage) Error() string { return w.msg }
|
||||
func (w *withMessage) Cause() error { return w.cause }
|
||||
|
||||
func (w *withMessage) Format(s fmt.State, verb rune) {
|
||||
switch verb {
|
||||
case 'v':
|
||||
if s.Flag('+') {
|
||||
fmt.Fprintf(s, "%+v\n", w.Cause())
|
||||
io.WriteString(s, w.msg)
|
||||
return
|
||||
}
|
||||
fallthrough
|
||||
case 's', 'q':
|
||||
io.WriteString(s, w.Error())
|
||||
}
|
||||
}
|
||||
|
||||
type timeout interface {
|
||||
Timeout() bool
|
||||
}
|
||||
|
||||
// IsTimeout returns true if the given error is caused by a timeout error.
|
||||
func IsTimeout(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if t, ok := errs.Cause(err).(timeout); ok {
|
||||
return t.Timeout()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
type temporary interface {
|
||||
Temporary() bool
|
||||
}
|
||||
|
||||
// IsTemporary returns true if the given error is caused by a temporary error.
|
||||
func IsTemporary(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
if t, ok := errs.Cause(err).(temporary); ok {
|
||||
return t.Temporary()
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsEOF returns true if the given error is caused by an EOF error.
|
||||
func IsEOF(err error) bool {
|
||||
err = errs.Cause(err)
|
||||
if err == io.EOF {
|
||||
return true
|
||||
}
|
||||
if ok, err := libCause(err); ok {
|
||||
return IsEOF(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsConnectionRefused returns true if the given error is caused by an "connection refused" error.
|
||||
func IsConnectionRefused(err error) bool {
|
||||
err = errs.Cause(err)
|
||||
if err, ok := err.(syscall.Errno); ok {
|
||||
return err == syscall.ECONNREFUSED
|
||||
}
|
||||
if ok, err := libCause(err); ok {
|
||||
return IsConnectionRefused(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsConnectionReset returns true if the given error is caused by an "connection reset by peer" error.
|
||||
func IsConnectionReset(err error) bool {
|
||||
err = errs.Cause(err)
|
||||
if err, ok := err.(syscall.Errno); ok {
|
||||
return err == syscall.ECONNRESET
|
||||
}
|
||||
if ok, err := libCause(err); ok {
|
||||
return IsConnectionReset(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsContextCanceled returns true if the given error is caused by a context cancelation.
|
||||
func IsContextCanceled(err error) bool {
|
||||
err = errs.Cause(err)
|
||||
if err == context.Canceled {
|
||||
return true
|
||||
}
|
||||
if ok, err := libCause(err); ok {
|
||||
return IsContextCanceled(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsContextDeadlineExpired returns true if the given error is caused by a context deadline expiration.
|
||||
func IsContextDeadlineExpired(err error) bool {
|
||||
err = errs.Cause(err)
|
||||
if err == context.DeadlineExceeded {
|
||||
return true
|
||||
}
|
||||
if ok, err := libCause(err); ok {
|
||||
return IsContextDeadlineExpired(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// IsContextCanceledOrExpired returns true if the given error is caused by a context cancelation
|
||||
// or deadline expiration.
|
||||
func IsContextCanceledOrExpired(err error) bool {
|
||||
err = errs.Cause(err)
|
||||
if err == context.Canceled || err == context.DeadlineExceeded {
|
||||
return true
|
||||
}
|
||||
if ok, err := libCause(err); ok {
|
||||
return IsContextCanceledOrExpired(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// libCause returns the Cause of well known go library errors.
|
||||
func libCause(err error) (bool, error) {
|
||||
original := err
|
||||
for {
|
||||
switch e := err.(type) {
|
||||
case *driver.ResponseError:
|
||||
err = e.Err
|
||||
case *net.DNSConfigError:
|
||||
err = e.Err
|
||||
case *net.OpError:
|
||||
err = e.Err
|
||||
case *os.SyscallError:
|
||||
err = e.Err
|
||||
case *url.Error:
|
||||
err = e.Err
|
||||
default:
|
||||
return err != original, err
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue