1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-05 07:26:55 +00:00
Signed-off-by: ShutingZhao <shuting@nirmata.com>
This commit is contained in:
shuting 2022-07-21 00:52:15 +08:00 committed by GitHub
parent 7a2045bc11
commit 23a1df0d7b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 9 deletions

View file

@ -1,3 +1,8 @@
## v1.7.2-rc2
### Note
- A new flag `maxQueuedEvents` is added to the Kyverno main container, this flag sets the up-limit of the events that are queued internally.
## v1.7.2-rc1
### Note

View file

@ -60,6 +60,7 @@ var (
metricsPort string
webhookTimeout int
genWorkers int
maxQueuedEvents int
profile bool
disableMetricsExport bool
enableTracing bool
@ -83,7 +84,8 @@ func main() {
klog.InitFlags(nil)
log.SetLogger(klogr.New().WithCallDepth(1))
flag.IntVar(&webhookTimeout, "webhookTimeout", int(webhookconfig.DefaultWebhookTimeout), "Timeout for webhook configurations.")
flag.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller")
flag.IntVar(&genWorkers, "genWorkers", 10, "Workers for generate controller.")
flag.IntVar(&maxQueuedEvents, "maxQueuedEvents", 1000, "Maximum events to be queued.")
flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flag.BoolVar(&profile, "profile", false, "Set this flag to 'true', to enable profiling.")
flag.StringVar(&profilePort, "profilePort", "6060", "Enable profiling at given port, defaults to 6060.")
@ -102,7 +104,7 @@ func main() {
flag.IntVar(&clientRateLimitBurst, "clientRateLimitBurst", 0, "Configure the maximum burst for throttle. Uses the client default if zero.")
flag.Func(toggle.AutogenInternalsFlagName, toggle.AutogenInternalsDescription, toggle.AutogenInternalsFlag)
flag.DurationVar(&webhookRegistrationTimeout, "webhookRegistrationTimeout", 120*time.Second, "Timeout for webhook registration, e.g., 30s, 1m, 5m.")
flag.IntVar(&changeRequestLimit, "maxReportChangeRequests", 1000, "maximum pending report change requests per namespace or for the cluster-wide policy report")
flag.IntVar(&changeRequestLimit, "maxReportChangeRequests", 1000, "Maximum pending report change requests per namespace or for the cluster-wide policy report.")
flag.BoolVar(&splitPolicyReport, "splitPolicyReport", false, "Set the flag to 'true', to enable the split-up PolicyReports per policy.")
if err := flag.Set("v", "2"); err != nil {
setupLog.Error(err, "failed to set log level")
@ -204,7 +206,7 @@ func main() {
// EVENT GENERATOR
// - generate event with retry mechanism
eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), log.Log.WithName("EventGenerator"))
eventGenerator := event.NewEventGenerator(dynamicClient, kyvernoV1.ClusterPolicies(), kyvernoV1.Policies(), maxQueuedEvents, log.Log.WithName("EventGenerator"))
// POLICY Report GENERATOR
reportReqGen := policyreport.NewReportChangeRequestGenerator(kyvernoClient,

View file

@ -16,7 +16,6 @@ import (
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
)
// Generator generate events
@ -37,6 +36,8 @@ type Generator struct {
// events generated at mutateExisting controller
mutateExistingRecorder record.EventRecorder
maxQueuedEvents int
log logr.Logger
}
@ -45,8 +46,8 @@ type Interface interface {
Add(infoList ...Info)
}
// NewEventGenerator to generate a new event controller
func NewEventGenerator(client dclient.Interface, cpInformer kyvernov1informers.ClusterPolicyInformer, pInformer kyvernov1informers.PolicyInformer, log logr.Logger) *Generator {
//NewEventGenerator to generate a new event controller
func NewEventGenerator(client dclient.Interface, cpInformer kyvernov1informers.ClusterPolicyInformer, pInformer kyvernov1informers.PolicyInformer, maxQueuedEvents int, log logr.Logger) *Generator {
gen := Generator{
client: client,
cpLister: cpInformer.Lister(),
@ -56,6 +57,7 @@ func NewEventGenerator(client dclient.Interface, cpInformer kyvernov1informers.C
admissionCtrRecorder: initRecorder(client, AdmissionController, log),
genPolicyRecorder: initRecorder(client, GeneratePolicyController, log),
mutateExistingRecorder: initRecorder(client, MutateExistingController, log),
maxQueuedEvents: maxQueuedEvents,
log: log,
}
return &gen
@ -73,7 +75,6 @@ func initRecorder(client dclient.Interface, eventSource Source, log logr.Logger)
return nil
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(klog.V(5).Infof)
eventInterface, err := client.GetEventsInterface()
if err != nil {
log.Error(err, "failed to get event interface for logging")
@ -96,6 +97,12 @@ func initRecorder(client dclient.Interface, eventSource Source, log logr.Logger)
// Add queues an event for generation
func (gen *Generator) Add(infos ...Info) {
logger := gen.log
if gen.queue.Len() > gen.maxQueuedEvents {
logger.V(5).Info("exceeds the event queue limit, dropping the event", "maxQueuedEvents", gen.maxQueuedEvents, "current size", gen.queue.Len())
return
}
for _, info := range infos {
if info.Name == "" {
// dont create event for resources with generateName
@ -154,7 +161,6 @@ func (gen *Generator) processNextWorkItem() bool {
}
defer gen.queue.Done(obj)
var key Info
var ok bool
if key, ok = obj.(Info); !ok {
@ -162,7 +168,6 @@ func (gen *Generator) processNextWorkItem() bool {
gen.log.Info("Incorrect type; expected type 'info'", "obj", obj)
return true
}
err := gen.syncHandler(key)
gen.handleErr(err, obj)
@ -191,6 +196,7 @@ func (gen *Generator) syncHandler(key Info) error {
if err != nil {
if !errors.IsNotFound(err) {
logger.Error(err, "failed to get resource", "kind", key.Kind, "name", key.Name, "namespace", key.Namespace)
return nil
}
return err
}