diff --git a/Makefile b/Makefile index 5a343e28e..4706a0574 100644 --- a/Makefile +++ b/Makefile @@ -226,7 +226,9 @@ run-unit-tests: $(GOBUILDDIR) $(SOURCES) $(REPOPATH)/pkg/deployment/reconcile \ $(REPOPATH)/pkg/deployment/resources \ $(REPOPATH)/pkg/util/k8sutil \ - $(REPOPATH)/pkg/util/k8sutil/test + $(REPOPATH)/pkg/util/k8sutil/test \ + $(REPOPATH)/pkg/util/probe \ + $(REPOPATH)/pkg/util/validation $(TESTBIN): $(GOBUILDDIR) $(SOURCES) @mkdir -p $(BINDIR) diff --git a/main.go b/main.go index c70136686..7a80c339f 100644 --- a/main.go +++ b/main.go @@ -85,8 +85,9 @@ var ( chaosOptions struct { allowed bool } - deploymentProbe probe.Probe - storageProbe probe.Probe + livenessProbe probe.LivenessProbe + deploymentProbe probe.ReadyProbe + storageProbe probe.ReadyProbe ) func init() { @@ -154,7 +155,7 @@ func cmdMainRun(cmd *cobra.Command, args []string) { } mux := http.NewServeMux() - mux.HandleFunc("/health", probe.LivenessHandler) + mux.HandleFunc("/health", livenessProbe.LivenessHandler) mux.HandleFunc("/ready/deployment", deploymentProbe.ReadyHandler) mux.HandleFunc("/ready/storage", storageProbe.ReadyHandler) mux.Handle("/metrics", prometheus.Handler()) @@ -222,6 +223,7 @@ func newOperatorConfigAndDeps(id, namespace, name string) (operator.Config, oper KubeExtCli: kubeExtCli, CRCli: crCli, EventRecorder: eventRecorder, + LivenessProbe: &livenessProbe, DeploymentProbe: &deploymentProbe, StorageProbe: &storageProbe, } diff --git a/pkg/operator/operator.go b/pkg/operator/operator.go index 9ba2c03e9..5fdeb85a9 100644 --- a/pkg/operator/operator.go +++ b/pkg/operator/operator.go @@ -77,8 +77,9 @@ type Dependencies struct { KubeExtCli apiextensionsclient.Interface CRCli versioned.Interface EventRecorder record.EventRecorder - DeploymentProbe *probe.Probe - StorageProbe *probe.Probe + LivenessProbe *probe.LivenessProbe + DeploymentProbe *probe.ReadyProbe + StorageProbe *probe.ReadyProbe } // NewOperator instantiates a new operator from given config & dependencies. diff --git a/pkg/operator/operator_deployment.go b/pkg/operator/operator_deployment.go index 2c9105d81..d62d32090 100644 --- a/pkg/operator/operator_deployment.go +++ b/pkg/operator/operator_deployment.go @@ -64,6 +64,9 @@ func (o *Operator) runDeployments(stop <-chan struct{}) { // onAddArangoDeployment deployment addition callback func (o *Operator) onAddArangoDeployment(obj interface{}) { + o.Dependencies.LivenessProbe.Lock() + defer o.Dependencies.LivenessProbe.Unlock() + apiObject := obj.(*api.ArangoDeployment) o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). @@ -73,6 +76,9 @@ func (o *Operator) onAddArangoDeployment(obj interface{}) { // onUpdateArangoDeployment deployment update callback func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) { + o.Dependencies.LivenessProbe.Lock() + defer o.Dependencies.LivenessProbe.Unlock() + apiObject := newObj.(*api.ArangoDeployment) o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). @@ -82,6 +88,9 @@ func (o *Operator) onUpdateArangoDeployment(oldObj, newObj interface{}) { // onDeleteArangoDeployment deployment delete callback func (o *Operator) onDeleteArangoDeployment(obj interface{}) { + o.Dependencies.LivenessProbe.Lock() + defer o.Dependencies.LivenessProbe.Unlock() + log := o.log apiObject, ok := obj.(*api.ArangoDeployment) if !ok { diff --git a/pkg/operator/operator_local_storage.go b/pkg/operator/operator_local_storage.go index e71a10369..008df5098 100644 --- a/pkg/operator/operator_local_storage.go +++ b/pkg/operator/operator_local_storage.go @@ -64,6 +64,9 @@ func (o *Operator) runLocalStorages(stop <-chan struct{}) { // onAddArangoLocalStorage local storage addition callback func (o *Operator) onAddArangoLocalStorage(obj interface{}) { + o.Dependencies.LivenessProbe.Lock() + defer o.Dependencies.LivenessProbe.Unlock() + apiObject := obj.(*api.ArangoLocalStorage) o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). @@ -73,6 +76,9 @@ func (o *Operator) onAddArangoLocalStorage(obj interface{}) { // onUpdateArangoLocalStorage local storage update callback func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) { + o.Dependencies.LivenessProbe.Lock() + defer o.Dependencies.LivenessProbe.Unlock() + apiObject := newObj.(*api.ArangoLocalStorage) o.log.Debug(). Str("name", apiObject.GetObjectMeta().GetName()). @@ -82,6 +88,9 @@ func (o *Operator) onUpdateArangoLocalStorage(oldObj, newObj interface{}) { // onDeleteArangoLocalStorage local storage delete callback func (o *Operator) onDeleteArangoLocalStorage(obj interface{}) { + o.Dependencies.LivenessProbe.Lock() + defer o.Dependencies.LivenessProbe.Unlock() + log := o.log apiObject, ok := obj.(*api.ArangoLocalStorage) if !ok { diff --git a/pkg/util/probe/health.go b/pkg/util/probe/health.go deleted file mode 100644 index 2ad9758de..000000000 --- a/pkg/util/probe/health.go +++ /dev/null @@ -1,32 +0,0 @@ -// -// DISCLAIMER -// -// Copyright 2018 ArangoDB GmbH, Cologne, Germany -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// -// Copyright holder is ArangoDB GmbH, Cologne, Germany -// -// Author Ewout Prangsma -// - -package probe - -import ( - "net/http" -) - -// LivenessHandler writes back the HTTP status code 200 to indicate a healthy operator. -func LivenessHandler(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) -} diff --git a/pkg/util/probe/liveness.go b/pkg/util/probe/liveness.go new file mode 100644 index 000000000..57a747088 --- /dev/null +++ b/pkg/util/probe/liveness.go @@ -0,0 +1,102 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package probe + +import ( + "net/http" + "sync" + "time" +) + +const ( + livenessHandlerTimeout = time.Second * 5 +) + +// LivenessProbe wraps a liveness probe handler. +type LivenessProbe struct { + lock int32 + mutex sync.Mutex + waitChan chan struct{} +} + +// Lock the probe, preventing the LivenessHandler from responding to requests. +func (p *LivenessProbe) Lock() { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.lock++ +} + +// Unlock the probe, allowing the LivenessHandler to respond to requests. +func (p *LivenessProbe) Unlock() { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.lock-- + + if p.lock == 0 && p.waitChan != nil { + w := p.waitChan + p.waitChan = nil + close(w) + } +} + +// waitUntilNotLocked blocks until the probe is no longer locked +// or a timeout occurs. +// Returns true if the probe is unlocked, false on timeout. +func (p *LivenessProbe) waitUntilNotLocked(timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + for { + var w chan struct{} + p.mutex.Lock() + locked := p.lock != 0 + if locked { + if p.waitChan == nil { + p.waitChan = make(chan struct{}) + } + w = p.waitChan + } + p.mutex.Unlock() + if !locked { + // All good + return true + } + // We're locked, wait until w is closed + select { + case <-w: + // continue + case <-time.After(time.Until(deadline)): + // Timeout + return false + } + } +} + +// LivenessHandler writes back the HTTP status code 200 if the operator is ready, and 500 otherwise. +func (p *LivenessProbe) LivenessHandler(w http.ResponseWriter, r *http.Request) { + if p.waitUntilNotLocked(livenessHandlerTimeout) { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusInternalServerError) + } +} diff --git a/pkg/util/probe/liveness_test.go b/pkg/util/probe/liveness_test.go new file mode 100644 index 000000000..1dcd3dee4 --- /dev/null +++ b/pkg/util/probe/liveness_test.go @@ -0,0 +1,82 @@ +// +// DISCLAIMER +// +// Copyright 2018 ArangoDB GmbH, Cologne, Germany +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Copyright holder is ArangoDB GmbH, Cologne, Germany +// +// Author Ewout Prangsma +// + +package probe + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestLivenessLock(t *testing.T) { + p := &LivenessProbe{} + assert.True(t, p.waitUntilNotLocked(time.Millisecond)) + + // Test single lock + p.Lock() + assert.False(t, p.waitUntilNotLocked(time.Millisecond)) + p.Unlock() + assert.True(t, p.waitUntilNotLocked(time.Millisecond)) + + // Test multiple locks + p.Lock() + assert.False(t, p.waitUntilNotLocked(time.Millisecond)) + p.Lock() + assert.False(t, p.waitUntilNotLocked(time.Millisecond)) + p.Unlock() + assert.False(t, p.waitUntilNotLocked(time.Millisecond)) + p.Unlock() + assert.True(t, p.waitUntilNotLocked(time.Millisecond)) + + // Test concurrent waits + wg := sync.WaitGroup{} + p.Lock() + wg.Add(1) + go func() { + // Waiter 1 + defer wg.Done() + assert.True(t, p.waitUntilNotLocked(time.Millisecond*200)) + }() + wg.Add(1) + go func() { + // Waiter 2 + defer wg.Done() + assert.True(t, p.waitUntilNotLocked(time.Millisecond*200)) + }() + wg.Add(1) + go func() { + // Waiter 3 + defer wg.Done() + assert.False(t, p.waitUntilNotLocked(time.Millisecond*5)) + }() + wg.Add(1) + go func() { + // Unlocker + defer wg.Done() + time.Sleep(time.Millisecond * 50) + p.Unlock() + }() + wg.Wait() +} diff --git a/pkg/util/probe/ready.go b/pkg/util/probe/ready.go index 636cc5a67..706ebdb35 100644 --- a/pkg/util/probe/ready.go +++ b/pkg/util/probe/ready.go @@ -27,18 +27,18 @@ import ( "sync/atomic" ) -// Probe wraps a readiness probe handler. -type Probe struct { +// ReadyProbe wraps a readiness probe handler. +type ReadyProbe struct { ready int32 } // SetReady marks the probe as ready. -func (p *Probe) SetReady() { +func (p *ReadyProbe) SetReady() { atomic.StoreInt32(&p.ready, 1) } // ReadyHandler writes back the HTTP status code 200 if the operator is ready, and 500 otherwise. -func (p *Probe) ReadyHandler(w http.ResponseWriter, r *http.Request) { +func (p *ReadyProbe) ReadyHandler(w http.ResponseWriter, r *http.Request) { isReady := atomic.LoadInt32(&p.ready) != 0 if isReady { w.WriteHeader(http.StatusOK)