mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-31 03:45:17 +00:00
handle retrys events
This commit is contained in:
parent
9f157544c9
commit
91030987ea
4 changed files with 33 additions and 14 deletions
8
main.go
8
main.go
|
@ -68,6 +68,10 @@ func main() {
|
|||
|
||||
stopCh := signals.SetupSignalHandler()
|
||||
|
||||
if err = webhookRegistrationClient.Register(); err != nil {
|
||||
glog.Fatalf("Failed registering Admission Webhooks: %v\n", err)
|
||||
}
|
||||
|
||||
policyInformerFactory.Run(stopCh)
|
||||
kubeInformer.Start(stopCh)
|
||||
eventController.Run(stopCh)
|
||||
|
@ -77,10 +81,6 @@ func main() {
|
|||
glog.Fatalf("Error running PolicyController: %v\n", err)
|
||||
}
|
||||
|
||||
if err = webhookRegistrationClient.Register(); err != nil {
|
||||
glog.Fatalf("Failed registering Admission Webhooks: %v\n", err)
|
||||
}
|
||||
|
||||
server.RunAsync()
|
||||
<-stopCh
|
||||
server.Stop()
|
||||
|
|
|
@ -8,7 +8,7 @@ import (
|
|||
|
||||
const policyWorkQueueName = "policyworkqueue"
|
||||
|
||||
const policyWorkQueueRetryLimit = 5
|
||||
const policyWorkQueueRetryLimit = 3
|
||||
|
||||
const policyControllerWorkerCount = 2
|
||||
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package event
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -92,40 +91,58 @@ func (c *controller) Stop() {
|
|||
defer c.queue.ShutDown()
|
||||
glog.Info("Shutting down eventbuilder controller workers")
|
||||
}
|
||||
|
||||
func (c *controller) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *controller) handleErr(err error, key interface{}) {
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return
|
||||
}
|
||||
// This controller retries if something goes wrong. After that, it stops trying.
|
||||
if c.queue.NumRequeues(key) < WorkQueueRetryLimit {
|
||||
glog.Warningf("Error syncing events %v: %v", key, err)
|
||||
// Re-enqueue the key rate limited. Based on the rate limiter on the
|
||||
// queue and the re-enqueue history, the key will be processed later again.
|
||||
c.queue.AddRateLimited(key)
|
||||
return
|
||||
}
|
||||
c.queue.Forget(key)
|
||||
glog.Error(err)
|
||||
glog.Warningf("Dropping the key out of the queue: %v", err)
|
||||
}
|
||||
|
||||
func (c *controller) processNextWorkItem() bool {
|
||||
obj, shutdown := c.queue.Get()
|
||||
if shutdown {
|
||||
return false
|
||||
}
|
||||
|
||||
err := func(obj interface{}) error {
|
||||
defer c.queue.Done(obj)
|
||||
var key Info
|
||||
var ok bool
|
||||
|
||||
if key, ok = obj.(Info); !ok {
|
||||
c.queue.Forget(obj)
|
||||
glog.Warningf("Expecting type info by got %v\n", obj)
|
||||
return nil
|
||||
}
|
||||
// Run the syncHandler, passing the resource and the policy
|
||||
if err := c.SyncHandler(key); err != nil {
|
||||
c.queue.AddRateLimited(key)
|
||||
return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Namespace+"/"+key.Name, err.Error())
|
||||
}
|
||||
err := c.syncHandler(key)
|
||||
c.handleErr(err, obj)
|
||||
return nil
|
||||
}(obj)
|
||||
|
||||
if err != nil {
|
||||
glog.Warning(err)
|
||||
glog.Error(err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *controller) SyncHandler(key Info) error {
|
||||
func (c *controller) syncHandler(key Info) error {
|
||||
var robj runtime.Object
|
||||
var err error
|
||||
|
||||
|
|
|
@ -6,6 +6,8 @@ const eventWorkQueueName = "policy-controller-events"
|
|||
|
||||
const eventWorkerThreadCount = 1
|
||||
|
||||
const WorkQueueRetryLimit = 1
|
||||
|
||||
//Info defines the event details
|
||||
type Info struct {
|
||||
Kind string
|
||||
|
|
Loading…
Add table
Reference in a new issue