1
0
Fork 0
mirror of https://github.com/kubernetes-sigs/node-feature-discovery.git synced 2024-12-14 11:57:51 +00:00

Tidy up usage of channels for signaling

This started as a small effort to simplify the usage of "ready" channel
in nfd-master. It extended into a wider simplification/unification of
the channel usage.
This commit is contained in:
Markus Lehtonen 2024-04-05 09:18:00 +03:00
parent 275e625c2a
commit 26a80cf142
6 changed files with 18 additions and 25 deletions

View file

@ -40,7 +40,7 @@ func TestNRTGC(t *testing.T) {
Convey("When theres is old NRT ", t, func() { Convey("When theres is old NRT ", t, func() {
gc := newMockGC(nil, []string{"node1"}) gc := newMockGC(nil, []string{"node1"})
errChan := make(chan error, 1) errChan := make(chan error)
go func() { errChan <- gc.Run() }() go func() { errChan <- gc.Run() }()
So(waitForNRT(gc.topoClient), ShouldBeTrue) So(waitForNRT(gc.topoClient), ShouldBeTrue)
@ -51,7 +51,7 @@ func TestNRTGC(t *testing.T) {
Convey("When theres is one old NRT and one up to date", t, func() { Convey("When theres is one old NRT and one up to date", t, func() {
gc := newMockGC([]string{"node1"}, []string{"node1", "node2"}) gc := newMockGC([]string{"node1"}, []string{"node1", "node2"})
errChan := make(chan error, 1) errChan := make(chan error)
go func() { errChan <- gc.Run() }() go func() { errChan <- gc.Run() }()
So(waitForNRT(gc.topoClient, "node1"), ShouldBeTrue) So(waitForNRT(gc.topoClient, "node1"), ShouldBeTrue)
@ -62,7 +62,7 @@ func TestNRTGC(t *testing.T) {
Convey("Should react to delete event", t, func() { Convey("Should react to delete event", t, func() {
gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"}) gc := newMockGC([]string{"node1", "node2"}, []string{"node1", "node2"})
errChan := make(chan error, 1) errChan := make(chan error)
go func() { errChan <- gc.Run() }() go func() { errChan <- gc.Run() }()
err := gc.k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{}) err := gc.k8sClient.CoreV1().Nodes().Delete(context.TODO(), "node1", metav1.DeleteOptions{})
@ -81,7 +81,7 @@ func TestNRTGC(t *testing.T) {
}, },
} }
errChan := make(chan error, 1) errChan := make(chan error)
go func() { errChan <- gc.Run() }() go func() { errChan <- gc.Run() }()
_, err := gc.topoClient.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrt, metav1.CreateOptions{}) _, err := gc.topoClient.TopologyV1alpha2().NodeResourceTopologies().Create(context.TODO(), &nrt, metav1.CreateOptions{})
@ -98,7 +98,7 @@ func newMockGC(nodes, nrts []string) *mockGC {
factory: informers.NewSharedInformerFactory(k8sClient, 5*time.Minute), factory: informers.NewSharedInformerFactory(k8sClient, 5*time.Minute),
nfdClient: fakenfdclientset.NewSimpleClientset(), nfdClient: fakenfdclientset.NewSimpleClientset(),
topoClient: faketopologyv1alpha2.NewSimpleClientset(createFakeNRTs(nrts...)...), topoClient: faketopologyv1alpha2.NewSimpleClientset(createFakeNRTs(nrts...)...),
stopChan: make(chan struct{}, 1), stopChan: make(chan struct{}),
args: &Args{ args: &Args{
GCPeriod: 10 * time.Minute, GCPeriod: 10 * time.Minute,
}, },

View file

@ -55,7 +55,7 @@ func init() {
func newNfdController(config *restclient.Config, nfdApiControllerOptions nfdApiControllerOptions) (*nfdController, error) { func newNfdController(config *restclient.Config, nfdApiControllerOptions nfdApiControllerOptions) (*nfdController, error) {
c := &nfdController{ c := &nfdController{
stopChan: make(chan struct{}, 1), stopChan: make(chan struct{}),
updateAllNodesChan: make(chan struct{}, 1), updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string), updateOneNodeChan: make(chan string),
} }

View file

@ -68,7 +68,7 @@ func newTestNode() *corev1.Node {
func newFakeNfdAPIController(client *fakenfdclient.Clientset) *nfdController { func newFakeNfdAPIController(client *fakenfdclient.Clientset) *nfdController {
c := &nfdController{ c := &nfdController{
stopChan: make(chan struct{}, 1), stopChan: make(chan struct{}),
updateAllNodesChan: make(chan struct{}, 1), updateAllNodesChan: make(chan struct{}, 1),
updateOneNodeChan: make(chan string), updateOneNodeChan: make(chan string),
} }

View file

@ -149,7 +149,7 @@ type nfdMaster struct {
server *grpc.Server server *grpc.Server
healthServer *grpc.Server healthServer *grpc.Server
stop chan struct{} stop chan struct{}
ready chan bool ready chan struct{}
k8sClient k8sclient.Interface k8sClient k8sclient.Interface
nodeUpdaterPool *nodeUpdaterPool nodeUpdaterPool *nodeUpdaterPool
deniedNs deniedNs
@ -161,8 +161,8 @@ func NewNfdMaster(args *Args) (NfdMaster, error) {
nfd := &nfdMaster{args: *args, nfd := &nfdMaster{args: *args,
nodeName: utils.NodeName(), nodeName: utils.NodeName(),
namespace: utils.GetKubernetesNamespace(), namespace: utils.GetKubernetesNamespace(),
ready: make(chan bool, 1), ready: make(chan struct{}),
stop: make(chan struct{}, 1), stop: make(chan struct{}),
} }
if args.Instance != "" { if args.Instance != "" {
@ -272,7 +272,7 @@ func (m *nfdMaster) Run() error {
} }
// Run gRPC server // Run gRPC server
grpcErr := make(chan error, 1) grpcErr := make(chan error)
// If the NodeFeature API is enabled, don'tregister the labeler API // If the NodeFeature API is enabled, don'tregister the labeler API
// server. Otherwise, register the labeler server. // server. Otherwise, register the labeler server.
if !features.NFDFeatureGate.Enabled(features.NodeFeatureAPI) { if !features.NFDFeatureGate.Enabled(features.NodeFeatureAPI) {
@ -296,7 +296,6 @@ func (m *nfdMaster) Run() error {
} }
// Notify that we're ready to accept connections // Notify that we're ready to accept connections
m.ready <- true
close(m.ready) close(m.ready)
// NFD-Master main event loop // NFD-Master main event loop
@ -397,7 +396,7 @@ func (m *nfdMaster) runGrpcServer(errChan chan<- error) {
klog.InfoS("gRPC server serving", "port", m.args.Port) klog.InfoS("gRPC server serving", "port", m.args.Port)
// Run gRPC server // Run gRPC server
grpcErr := make(chan error, 1) grpcErr := make(chan error)
go func() { go func() {
defer lis.Close() defer lis.Close()
grpcErr <- m.server.Serve(lis) grpcErr <- m.server.Serve(lis)
@ -475,15 +474,10 @@ func (m *nfdMaster) Stop() {
// Wait until NfdMaster is able able to accept connections. // Wait until NfdMaster is able able to accept connections.
func (m *nfdMaster) WaitForReady(timeout time.Duration) bool { func (m *nfdMaster) WaitForReady(timeout time.Duration) bool {
select { select {
case ready, ok := <-m.ready: case <-m.ready:
// Ready if the flag is true or the channel has been closed return true
if ready || !ok {
return true
}
case <-time.After(timeout): case <-time.After(timeout):
return false
} }
// We should never end-up here
return false return false
} }

View file

@ -111,7 +111,7 @@ func NewTopologyUpdater(args Args, resourcemonitorArgs resourcemonitor.Args) (Nf
nfd := &nfdTopologyUpdater{ nfd := &nfdTopologyUpdater{
args: args, args: args,
resourcemonitorArgs: resourcemonitorArgs, resourcemonitorArgs: resourcemonitorArgs,
stop: make(chan struct{}, 1), stop: make(chan struct{}),
nodeName: utils.NodeName(), nodeName: utils.NodeName(),
eventSource: eventSource, eventSource: eventSource,
config: &NFDConfig{}, config: &NFDConfig{},
@ -207,7 +207,6 @@ func (w *nfdTopologyUpdater) Run() error {
// CAUTION: these resources are expected to change rarely - if ever. // CAUTION: these resources are expected to change rarely - if ever.
// So we are intentionally do this once during the process lifecycle. // So we are intentionally do this once during the process lifecycle.
// TODO: Obtain node resources dynamically from the podresource API // TODO: Obtain node resources dynamically from the podresource API
// zonesChannel := make(chan v1alpha1.ZoneList)
var zones v1alpha2.ZoneList var zones v1alpha2.ZoneList
excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName) excludeList := resourcemonitor.NewExcludeResourceList(w.config.ExcludeList, w.nodeName)
@ -216,7 +215,7 @@ func (w *nfdTopologyUpdater) Run() error {
return fmt.Errorf("failed to obtain node resource information: %w", err) return fmt.Errorf("failed to obtain node resource information: %w", err)
} }
grpcErr := make(chan error, 1) grpcErr := make(chan error)
// Start gRPC server for liveness probe (at this point we're "live") // Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 { if w.args.GrpcHealthPort != 0 {

View file

@ -146,7 +146,7 @@ func NewNfdWorker(args *Args) (NfdWorker, error) {
args: *args, args: *args,
config: &NFDConfig{}, config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(), kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}, 1), stop: make(chan struct{}),
} }
// Check TLS related args // Check TLS related args
@ -290,7 +290,7 @@ func (w *nfdWorker) Run() error {
return nil return nil
} }
grpcErr := make(chan error, 1) grpcErr := make(chan error)
// Start gRPC server for liveness probe (at this point we're "live") // Start gRPC server for liveness probe (at this point we're "live")
if w.args.GrpcHealthPort != 0 { if w.args.GrpcHealthPort != 0 {