1
0
Fork 0
mirror of https://github.com/prometheus-operator/prometheus-operator.git synced 2025-04-21 11:48:53 +00:00

pkg/listwatch: remove multilistwatcher

This commit is contained in:
Sergiusz Urbaniak 2020-08-31 13:12:31 +02:00
parent 2379f59f6f
commit 5e94344182
3 changed files with 22 additions and 319 deletions

View file

@ -16,17 +16,13 @@ package listwatch
import (
"context"
"fmt"
"strings"
"sync"
"github.com/go-kit/kit/log"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
@ -109,167 +105,6 @@ func NewFilteredUnprivilegedNamespaceListWatchFromClient(l log.Logger, c cache.G
return &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// MultiNamespaceListerWatcher takes allowed and denied namespaces and a
// cache.ListerWatcher generator func and returns a single cache.ListerWatcher
// capable of operating on multiple namespaces.
//
// Allowed namespaces and denied namespaces are mutually exclusive.
// If allowed namespaces contain multiple items, the given denied namespaces have no effect.
// If the allowed namespaces includes exactly one entry with the value v1.NamespaceAll (empty string),
// the given denied namespaces are applied.
func MultiNamespaceListerWatcher(l log.Logger, allowedNamespaces, deniedNamespaces map[string]struct{}, f func(string) cache.ListerWatcher) cache.ListerWatcher {
// If there is only one namespace then there is no need to create a
// multi lister watcher proxy.
if IsAllNamespaces(allowedNamespaces) {
return newDenylistListerWatcher(l, deniedNamespaces, f(v1.NamespaceAll))
}
if len(allowedNamespaces) == 1 {
for n := range allowedNamespaces {
return f(n)
}
}
var lws []cache.ListerWatcher
for n := range allowedNamespaces {
lws = append(lws, f(n))
}
return multiListerWatcher(lws)
}
// multiListerWatcher abstracts several cache.ListerWatchers, allowing them
// to be treated as a single cache.ListerWatcher.
type multiListerWatcher []cache.ListerWatcher
// List implements the ListerWatcher interface.
// It combines the output of the List method of every ListerWatcher into
// a single result.
func (mlw multiListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
l := metav1.List{}
resourceVersions := sets.NewString()
for _, lw := range mlw {
list, err := lw.List(options)
if err != nil {
return nil, err
}
items, err := meta.ExtractList(list)
if err != nil {
return nil, err
}
metaObj, err := meta.ListAccessor(list)
if err != nil {
return nil, err
}
for _, item := range items {
l.Items = append(l.Items, runtime.RawExtension{Object: item.DeepCopyObject()})
}
if !resourceVersions.Has(metaObj.GetResourceVersion()) {
resourceVersions.Insert(metaObj.GetResourceVersion())
}
}
// Combine the resource versions so that the composite Watch method can
// distribute appropriate versions to each underlying Watch func.
l.ListMeta.ResourceVersion = strings.Join(resourceVersions.List(), "/")
return &l, nil
}
// Watch implements the ListerWatcher interface.
// It returns a watch.Interface that combines the output from the
// watch.Interface of every cache.ListerWatcher into a single result chan.
func (mlw multiListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
var resourceVersions string
// Allow resource versions to be "".
if options.ResourceVersion != "" {
rvs := strings.Split(options.ResourceVersion, "/")
if len(rvs) > 1 {
return nil, fmt.Errorf("expected resource version to have 1 part, got %d", len(rvs))
}
resourceVersions = options.ResourceVersion
}
return newMultiWatch(mlw, resourceVersions, options)
}
// multiWatch abstracts multiple watch.Interface's, allowing them
// to be treated as a single watch.Interface.
type multiWatch struct {
result chan watch.Event
stopped chan struct{}
stoppers []func()
}
// newMultiWatch returns a new multiWatch or an error if one of the underlying
// Watch funcs errored.
func newMultiWatch(lws []cache.ListerWatcher, resourceVersions string, options metav1.ListOptions) (*multiWatch, error) {
var (
result = make(chan watch.Event)
stopped = make(chan struct{})
stoppers []func()
wg sync.WaitGroup
)
wg.Add(len(lws))
for _, lw := range lws {
o := options.DeepCopy()
o.ResourceVersion = resourceVersions
w, err := lw.Watch(*o)
if err != nil {
return nil, err
}
go func() {
defer wg.Done()
for {
event, ok := <-w.ResultChan()
if !ok {
return
}
select {
case result <- event:
case <-stopped:
return
}
}
}()
stoppers = append(stoppers, w.Stop)
}
// result chan must be closed,
// once all event sender goroutines exited.
go func() {
wg.Wait()
close(result)
}()
return &multiWatch{
result: result,
stoppers: stoppers,
stopped: stopped,
}, nil
}
// ResultChan implements the watch.Interface interface.
func (mw *multiWatch) ResultChan() <-chan watch.Event {
return mw.result
}
// Stop implements the watch.Interface interface.
// It stops all of the underlying watch.Interfaces and closes the backing chan.
// Can safely be called more than once.
func (mw *multiWatch) Stop() {
select {
case <-mw.stopped:
// nothing to do, we are already stopped
default:
for _, stop := range mw.stoppers {
stop()
}
close(mw.stopped)
}
return
}
// IsAllNamespaces checks if the given map of namespaces
// contains only v1.NamespaceAll.
func IsAllNamespaces(namespaces map[string]struct{}) bool {

View file

@ -15,163 +15,9 @@
package listwatch
import (
"sync"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
)
var _ watch.Interface = &multiWatch{}
func setupMultiWatch(n int, t *testing.T, rvs string) ([]*watch.FakeWatcher, *multiWatch) {
ws := make([]*watch.FakeWatcher, n)
lws := make([]cache.ListerWatcher, n)
for i := range ws {
w := watch.NewFake()
ws[i] = w
lws[i] = &cache.ListWatch{WatchFunc: func(_ metav1.ListOptions) (watch.Interface, error) {
return w, nil
}}
}
m, err := newMultiWatch(lws, rvs, metav1.ListOptions{})
if err != nil {
t.Fatalf("failed to create new multiWatch: %v", err)
}
return ws, m
}
func TestNewMultiWatch(t *testing.T) {
func() {
defer func() {
if r := recover(); r != nil {
t.Errorf("newMultiWatch should not panic when number of resource versions matches ListerWatchers; got: %v", r)
}
}()
// Create a multiWatch from 1 ListerWatchers and pass 1 resource versions.
_, _ = setupMultiWatch(1, t, "1")
}()
}
func TestMultiWatchResultChan(t *testing.T) {
ws, m := setupMultiWatch(10, t, "10")
defer m.Stop()
var events []watch.Event
var wg sync.WaitGroup
for _, w := range ws {
w := w
wg.Add(1)
go func() {
w.Add(&runtime.Unknown{})
}()
}
go func() {
for {
event, ok := <-m.ResultChan()
if !ok {
break
}
events = append(events, event)
wg.Done()
}
}()
wg.Wait()
if len(events) != len(ws) {
t.Errorf("expected %d events but got %d", len(ws), len(events))
}
}
func TestMultiWatchStop(t *testing.T) {
ws, m := setupMultiWatch(10, t, "10")
m.Stop()
var stopped int
for _, w := range ws {
_, running := <-w.ResultChan()
if !running && w.IsStopped() {
stopped++
}
}
if stopped != len(ws) {
t.Errorf("expected %d watchers to be stopped but got %d", len(ws), stopped)
}
select {
case <-m.stopped:
// all good, watcher is closed, proceed
default:
t.Error("expected multiWatch to be stopped")
}
_, running := <-m.ResultChan()
if running {
t.Errorf("expected multiWatch chan to be closed")
}
}
type mockListerWatcher struct {
listResult runtime.Object
evCh chan watch.Event
stopped bool
}
func (m *mockListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
return m.listResult, nil
}
func (m *mockListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return m, nil
}
func (m *mockListerWatcher) Stop() {
m.stopped = true
}
func (m *mockListerWatcher) ResultChan() <-chan watch.Event {
return m.evCh
}
func TestRacyMultiWatch(t *testing.T) {
evCh := make(chan watch.Event)
lw := &mockListerWatcher{evCh: evCh}
mw, err := newMultiWatch(
[]cache.ListerWatcher{lw},
"foo",
metav1.ListOptions{},
)
if err != nil {
t.Error(err)
return
}
// this will not block, as newMultiWatch started a goroutine,
// receiving that event and block on the dispatching it there.
evCh <- watch.Event{
Type: "foo",
}
if got := <-mw.ResultChan(); got.Type != "foo" {
t.Errorf("expected foo, got %s", got.Type)
return
}
// Enqueue event, do not dequeue it.
// In conjunction with go test -race this asserts
// if there is a race between stopping and dispatching an event
evCh <- watch.Event{
Type: "bar",
}
mw.Stop()
if got := lw.stopped; got != true {
t.Errorf("expected watcher to be closed true, got %t", got)
}
// some reentrant calls, should be non-blocking
mw.Stop()
mw.Stop()
}
func TestIdenticalNamespaces(t *testing.T) {
for _, tc := range []struct {
a, b map[string]struct{}

View file

@ -28,6 +28,28 @@ import (
"k8s.io/apimachinery/pkg/watch"
)
type mockListerWatcher struct {
listResult runtime.Object
evCh chan watch.Event
stopped bool
}
func (m *mockListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
return m.listResult, nil
}
func (m *mockListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return m, nil
}
func (m *mockListerWatcher) Stop() {
m.stopped = true
}
func (m *mockListerWatcher) ResultChan() <-chan watch.Event {
return m.evCh
}
func newUnstructured(namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{