mirror of
https://github.com/kyverno/kyverno.git
synced 2025-04-08 18:15:48 +00:00
Removed excess channel awaiting
This commit is contained in:
parent
6dc253eca1
commit
b3452d048f
2 changed files with 17 additions and 17 deletions
|
@ -3,6 +3,7 @@ package event
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
kubeClient "github.com/nirmata/kube-policy/kubeclient"
|
||||
|
@ -35,13 +36,18 @@ type Generator interface {
|
|||
//Controller api
|
||||
type Controller interface {
|
||||
Generator
|
||||
Run(stopCh <-chan struct{}) error
|
||||
Run(stopCh <-chan struct{})
|
||||
}
|
||||
|
||||
//NewEventController to generate a new event controller
|
||||
func NewEventController(kubeClient *kubeClient.KubeClient,
|
||||
policyLister policylister.PolicyLister,
|
||||
logger *log.Logger) Controller {
|
||||
|
||||
if logger == nil {
|
||||
logger = log.New(os.Stdout, "Event Controller: ", log.LstdFlags)
|
||||
}
|
||||
|
||||
controller := &controller{
|
||||
kubeClient: kubeClient,
|
||||
policyLister: policyLister,
|
||||
|
@ -49,6 +55,7 @@ func NewEventController(kubeClient *kubeClient.KubeClient,
|
|||
recorder: initRecorder(kubeClient),
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
return controller
|
||||
}
|
||||
|
||||
|
@ -70,20 +77,14 @@ func (c *controller) Add(info Info) {
|
|||
c.queue.Add(info)
|
||||
}
|
||||
|
||||
func (c *controller) Run(stopCh <-chan struct{}) error {
|
||||
func (c *controller) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
log.Println("starting eventbuilder controller")
|
||||
|
||||
log.Println("Starting eventbuilder controller workers")
|
||||
for i := 0; i < eventWorkerThreadCount; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
}
|
||||
log.Println("Started eventbuilder controller workers")
|
||||
<-stopCh
|
||||
log.Println("Shutting down eventbuilder controller workers")
|
||||
return nil
|
||||
c.logger.Println("Started eventbuilder controller")
|
||||
}
|
||||
|
||||
func (c *controller) runWorker() {
|
||||
|
@ -102,7 +103,7 @@ func (c *controller) processNextWorkItem() bool {
|
|||
var ok bool
|
||||
if key, ok = obj.(Info); !ok {
|
||||
c.queue.Forget(obj)
|
||||
log.Printf("Expecting type info by got %v", obj)
|
||||
c.logger.Printf("Expecting type info by got %v\n", obj)
|
||||
return nil
|
||||
}
|
||||
// Run the syncHandler, passing the resource and the policy
|
||||
|
|
|
@ -3,6 +3,7 @@ package policycontroller
|
|||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
kubeClient "github.com/nirmata/kube-policy/kubeclient"
|
||||
|
@ -43,6 +44,10 @@ func NewPolicyController(policyInterface policyclientset.Interface,
|
|||
logger *log.Logger,
|
||||
kubeClient *kubeClient.KubeClient) *PolicyController {
|
||||
|
||||
if logger == nil {
|
||||
logger = log.New(os.Stdout, "Policy Controller: ", log.LstdFlags)
|
||||
}
|
||||
|
||||
controller := &PolicyController{
|
||||
kubeClient: kubeClient,
|
||||
policyLister: policyInformer.Lister(),
|
||||
|
@ -101,21 +106,15 @@ func (pc *PolicyController) Run(stopCh <-chan struct{}) error {
|
|||
defer utilruntime.HandleCrash()
|
||||
defer pc.queue.ShutDown()
|
||||
|
||||
pc.logger.Printf("starting policy controller")
|
||||
|
||||
pc.logger.Printf("waiting for infomer caches to sync")
|
||||
if ok := cache.WaitForCacheSync(stopCh, pc.policySynced); !ok {
|
||||
return fmt.Errorf("failed to wait for caches to sync")
|
||||
}
|
||||
|
||||
pc.logger.Println("starting policy controller workers")
|
||||
for i := 0; i < policyControllerWorkerCount; i++ {
|
||||
go wait.Until(pc.runWorker, time.Second, stopCh)
|
||||
}
|
||||
|
||||
pc.logger.Println("started policy controller workers")
|
||||
<-stopCh
|
||||
pc.logger.Println("shutting down policy controller workers")
|
||||
pc.logger.Println("Started policy controller")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue