1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Merge pull request #115 from arangodb/improved-ready-detection

Improved liveness detection
This commit is contained in:
Ewout Prangsma 2018-04-05 16:54:54 +02:00 committed by GitHub
commit 7cfeb005b3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 217 additions and 42 deletions

View file

@ -226,7 +226,9 @@ run-unit-tests: $(GOBUILDDIR) $(SOURCES)
$(REPOPATH)/pkg/deployment/reconcile \ $(REPOPATH)/pkg/deployment/reconcile \
$(REPOPATH)/pkg/deployment/resources \ $(REPOPATH)/pkg/deployment/resources \
$(REPOPATH)/pkg/util/k8sutil \ $(REPOPATH)/pkg/util/k8sutil \
$(REPOPATH)/pkg/util/k8sutil/test $(REPOPATH)/pkg/util/k8sutil/test \
$(REPOPATH)/pkg/util/probe \
$(REPOPATH)/pkg/util/validation
$(TESTBIN): $(GOBUILDDIR) $(SOURCES) $(TESTBIN): $(GOBUILDDIR) $(SOURCES)
@mkdir -p $(BINDIR) @mkdir -p $(BINDIR)

View file

@ -85,8 +85,9 @@ var (
chaosOptions struct { chaosOptions struct {
allowed bool allowed bool
} }
deploymentProbe probe.Probe livenessProbe probe.LivenessProbe
storageProbe probe.Probe deploymentProbe probe.ReadyProbe
storageProbe probe.ReadyProbe
) )
func init() { func init() {
@ -154,7 +155,7 @@ func cmdMainRun(cmd *cobra.Command, args []string) {
} }
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/health", probe.LivenessHandler) mux.HandleFunc("/health", livenessProbe.LivenessHandler)
mux.HandleFunc("/ready/deployment", deploymentProbe.ReadyHandler) mux.HandleFunc("/ready/deployment", deploymentProbe.ReadyHandler)
mux.HandleFunc("/ready/storage", storageProbe.ReadyHandler) mux.HandleFunc("/ready/storage", storageProbe.ReadyHandler)
mux.Handle("/metrics", prometheus.Handler()) mux.Handle("/metrics", prometheus.Handler())
@ -222,6 +223,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper
KubeExtCli: kubeExtCli, KubeExtCli: kubeExtCli,
CRCli: crCli, CRCli: crCli,
EventRecorder: eventRecorder, EventRecorder: eventRecorder,
LivenessProbe: &livenessProbe,
DeploymentProbe: &deploymentProbe, DeploymentProbe: &deploymentProbe,
StorageProbe: &storageProbe, StorageProbe: &storageProbe,
} }

View file

@ -77,8 +77,9 @@ type Dependencies struct {
KubeExtCli apiextensionsclient.Interface KubeExtCli apiextensionsclient.Interface
CRCli versioned.Interface CRCli versioned.Interface
EventRecorder record.EventRecorder EventRecorder record.EventRecorder
DeploymentProbe *probe.Probe LivenessProbe *probe.LivenessProbe
StorageProbe *probe.Probe DeploymentProbe *probe.ReadyProbe
StorageProbe *probe.ReadyProbe
} }
// NewOperator instantiates a new operator from given config & dependencies. // NewOperator instantiates a new operator from given config & dependencies.

View file

@ -64,6 +64,9 @@ func (o *Operator) runDeployments(stop <-chan struct{}) {
// onAddArangoDeployment deployment addition callback // onAddArangoDeployment deployment addition callback
func (o *Operator) onAddArangoDeployment(obj interface{}) { func (o *Operator) onAddArangoDeployment(obj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
apiObject := obj.(*api.ArangoDeployment) apiObject := obj.(*api.ArangoDeployment)
o.log.Debug(). o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()). Str("name", apiObject.GetObjectMeta().GetName()).
@ -73,6 +76,9 @@ func (o *Operator) onAddArangoDeployment(obj interface{}) {
// onUpdateArangoDeployment deployment update callback // onUpdateArangoDeployment deployment update callback
func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) { func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
apiObject := newObj.(*api.ArangoDeployment) apiObject := newObj.(*api.ArangoDeployment)
o.log.Debug(). o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()). Str("name", apiObject.GetObjectMeta().GetName()).
@ -82,6 +88,9 @@ func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) {
// onDeleteArangoDeployment deployment delete callback // onDeleteArangoDeployment deployment delete callback
func (o *Operator) onDeleteArangoDeployment(obj interface{}) { func (o *Operator) onDeleteArangoDeployment(obj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
log := o.log log := o.log
apiObject, ok := obj.(*api.ArangoDeployment) apiObject, ok := obj.(*api.ArangoDeployment)
if !ok { if !ok {

View file

@ -64,6 +64,9 @@ func (o *Operator) runLocalStorages(stop <-chan struct{}) {
// onAddArangoLocalStorage local storage addition callback // onAddArangoLocalStorage local storage addition callback
func (o *Operator) onAddArangoLocalStorage(obj interface{}) { func (o *Operator) onAddArangoLocalStorage(obj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
apiObject := obj.(*api.ArangoLocalStorage) apiObject := obj.(*api.ArangoLocalStorage)
o.log.Debug(). o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()). Str("name", apiObject.GetObjectMeta().GetName()).
@ -73,6 +76,9 @@ func (o *Operator) onAddArangoLocalStorage(obj interface{}) {
// onUpdateArangoLocalStorage local storage update callback // onUpdateArangoLocalStorage local storage update callback
func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) { func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
apiObject := newObj.(*api.ArangoLocalStorage) apiObject := newObj.(*api.ArangoLocalStorage)
o.log.Debug(). o.log.Debug().
Str("name", apiObject.GetObjectMeta().GetName()). Str("name", apiObject.GetObjectMeta().GetName()).
@ -82,6 +88,9 @@ func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) {
// onDeleteArangoLocalStorage local storage delete callback // onDeleteArangoLocalStorage local storage delete callback
func (o *Operator) onDeleteArangoLocalStorage(obj interface{}) { func (o *Operator) onDeleteArangoLocalStorage(obj interface{}) {
o.Dependencies.LivenessProbe.Lock()
defer o.Dependencies.LivenessProbe.Unlock()
log := o.log log := o.log
apiObject, ok := obj.(*api.ArangoLocalStorage) apiObject, ok := obj.(*api.ArangoLocalStorage)
if !ok { if !ok {

View file

@ -1,32 +0,0 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package probe
import (
"net/http"
)
// LivenessHandler writes back the HTTP status code 200 to indicate a healthy operator.
func LivenessHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

102
pkg/util/probe/liveness.go Normal file
View file

@ -0,0 +1,102 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package probe
import (
"net/http"
"sync"
"time"
)
const (
livenessHandlerTimeout = time.Second * 5
)
// LivenessProbe wraps a liveness probe handler.
type LivenessProbe struct {
lock int32
mutex sync.Mutex
waitChan chan struct{}
}
// Lock the probe, preventing the LivenessHandler from responding to requests.
func (p *LivenessProbe) Lock() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.lock++
}
// Unlock the probe, allowing the LivenessHandler to respond to requests.
func (p *LivenessProbe) Unlock() {
p.mutex.Lock()
defer p.mutex.Unlock()
p.lock--
if p.lock == 0 && p.waitChan != nil {
w := p.waitChan
p.waitChan = nil
close(w)
}
}
// waitUntilNotLocked blocks until the probe is no longer locked
// or a timeout occurs.
// Returns true if the probe is unlocked, false on timeout.
func (p *LivenessProbe) waitUntilNotLocked(timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
for {
var w chan struct{}
p.mutex.Lock()
locked := p.lock != 0
if locked {
if p.waitChan == nil {
p.waitChan = make(chan struct{})
}
w = p.waitChan
}
p.mutex.Unlock()
if !locked {
// All good
return true
}
// We're locked, wait until w is closed
select {
case <-w:
// continue
case <-time.After(time.Until(deadline)):
// Timeout
return false
}
}
}
// LivenessHandler writes back the HTTP status code 200 if the operator is ready, and 500 otherwise.
func (p *LivenessProbe) LivenessHandler(w http.ResponseWriter, r *http.Request) {
if p.waitUntilNotLocked(livenessHandlerTimeout) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}

View file

@ -0,0 +1,82 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package probe
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestLivenessLock(t *testing.T) {
p := &LivenessProbe{}
assert.True(t, p.waitUntilNotLocked(time.Millisecond))
// Test single lock
p.Lock()
assert.False(t, p.waitUntilNotLocked(time.Millisecond))
p.Unlock()
assert.True(t, p.waitUntilNotLocked(time.Millisecond))
// Test multiple locks
p.Lock()
assert.False(t, p.waitUntilNotLocked(time.Millisecond))
p.Lock()
assert.False(t, p.waitUntilNotLocked(time.Millisecond))
p.Unlock()
assert.False(t, p.waitUntilNotLocked(time.Millisecond))
p.Unlock()
assert.True(t, p.waitUntilNotLocked(time.Millisecond))
// Test concurrent waits
wg := sync.WaitGroup{}
p.Lock()
wg.Add(1)
go func() {
// Waiter 1
defer wg.Done()
assert.True(t, p.waitUntilNotLocked(time.Millisecond*200))
}()
wg.Add(1)
go func() {
// Waiter 2
defer wg.Done()
assert.True(t, p.waitUntilNotLocked(time.Millisecond*200))
}()
wg.Add(1)
go func() {
// Waiter 3
defer wg.Done()
assert.False(t, p.waitUntilNotLocked(time.Millisecond*5))
}()
wg.Add(1)
go func() {
// Unlocker
defer wg.Done()
time.Sleep(time.Millisecond * 50)
p.Unlock()
}()
wg.Wait()
}

View file

@ -27,18 +27,18 @@ import (
"sync/atomic" "sync/atomic"
) )
// Probe wraps a readiness probe handler. // ReadyProbe wraps a readiness probe handler.
type Probe struct { type ReadyProbe struct {
ready int32 ready int32
} }
// SetReady marks the probe as ready. // SetReady marks the probe as ready.
func (p *Probe) SetReady() { func (p *ReadyProbe) SetReady() {
atomic.StoreInt32(&p.ready, 1) atomic.StoreInt32(&p.ready, 1)
} }
// ReadyHandler writes back the HTTP status code 200 if the operator is ready, and 500 otherwise. // ReadyHandler writes back the HTTP status code 200 if the operator is ready, and 500 otherwise.
func (p *Probe) ReadyHandler(w http.ResponseWriter, r *http.Request) { func (p *ReadyProbe) ReadyHandler(w http.ResponseWriter, r *http.Request) {
isReady := atomic.LoadInt32(&p.ready) != 0 isReady := atomic.LoadInt32(&p.ready) != 0
if isReady { if isReady {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)