1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 18:38:40 +00:00

event generator cleanup

This commit is contained in:
shivkumar dudhani 2019-08-09 13:41:56 -07:00
parent 373d9a45ad
commit 135f241a4a
5 changed files with 76 additions and 105 deletions

View file

@ -62,7 +62,7 @@ func main() {
glog.Fatalf("Error creating policy sharedinformer: %v\n", err)
}
kubeInformer := utils.NewKubeInformerFactory(clientConfig)
eventController := event.NewEventController(client, policyInformerFactory)
egen := event.NewEventGenerator(client, policyInformerFactory)
// violationBuilder := violation.NewPolicyViolationBuilder(client, policyInformerFactory, eventController)
annotationsController := annotations.NewAnnotationControler(client)
// policyController := controller.NewPolicyController(
@ -78,7 +78,7 @@ func main() {
if err != nil {
glog.Fatalf("Failed to initialize TLS key/certificate pair: %v\n", err)
}
server, err := webhooks.NewWebhookServer(client, tlsPair, policyInformerFactory, eventController, nil, annotationsController, filterK8Resources)
server, err := webhooks.NewWebhookServer(client, tlsPair, policyInformerFactory, egen, nil, annotationsController, filterK8Resources)
if err != nil {
glog.Fatalf("Unable to create webhook server: %v\n", err)
}
@ -98,11 +98,12 @@ func main() {
pInformer.Start(stopCh)
go pc.Run(1, stopCh)
go pvc.Run(1, stopCh)
go egen.Run(1, stopCh)
//TODO add WG for the go routine?
//--------
policyInformerFactory.Run(stopCh)
kubeInformer.Start(stopCh)
eventController.Run(stopCh)
// eventController.Run(stopCh)
// genControler.Run(stopCh)
annotationsController.Run(stopCh)
// if err = policyController.Run(stopCh); err != nil {
@ -113,7 +114,7 @@ func main() {
<-stopCh
server.Stop()
// genControler.Stop()
eventController.Stop()
// eventController.Stop()
annotationsController.Stop()
// policyController.Stop()
}

View file

@ -18,35 +18,31 @@ import (
"k8s.io/client-go/util/workqueue"
)
type controller struct {
//Generator generate events
type Generator struct {
client *client.Client
policyLister v1alpha1.PolicyLister
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
}
//Generator to generate event
type Generator interface {
//Interface to generate event
type Interface interface {
Add(infoList ...*Info)
}
//Controller api
type Controller interface {
Generator
Run(stopCh <-chan struct{})
Stop()
}
//NewEventGenerator to generate a new event controller
func NewEventGenerator(client *client.Client,
shareInformer sharedinformer.PolicyInformer) *Generator {
//NewEventController to generate a new event controller
func NewEventController(client *client.Client,
shareInformer sharedinformer.PolicyInformer) Controller {
return &controller{
gen := Generator{
client: client,
policyLister: shareInformer.GetLister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
recorder: initRecorder(client),
}
return &gen
}
func initRecorder(client *client.Client) record.EventRecorder {
@ -72,67 +68,67 @@ func initRecorder(client *client.Client) record.EventRecorder {
return recorder
}
func (c *controller) Add(infos ...*Info) {
//Add queues an event for generation
func (gen *Generator) Add(infos ...*Info) {
for _, info := range infos {
c.queue.Add(*info)
gen.queue.Add(*info)
}
}
func (c *controller) Run(stopCh <-chan struct{}) {
// Run begins generator
func (gen *Generator) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
glog.Info("Starting event generator")
defer glog.Info("Shutting down event generator")
for i := 0; i < eventWorkerThreadCount; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
for i := 0; i < workers; i++ {
go wait.Until(gen.runWorker, time.Second, stopCh)
}
glog.Info("Started eventbuilder controller workers")
<-stopCh
}
func (c *controller) Stop() {
defer c.queue.ShutDown()
glog.Info("Shutting down eventbuilder controller workers")
}
func (c *controller) runWorker() {
for c.processNextWorkItem() {
func (gen *Generator) runWorker() {
for gen.processNextWorkItem() {
}
}
func (c *controller) handleErr(err error, key interface{}) {
func (gen *Generator) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
gen.queue.Forget(key)
return
}
// This controller retries if something goes wrong. After that, it stops trying.
if c.queue.NumRequeues(key) < workQueueRetryLimit {
if gen.queue.NumRequeues(key) < workQueueRetryLimit {
glog.Warningf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
c.queue.AddRateLimited(key)
gen.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
gen.queue.Forget(key)
glog.Error(err)
glog.Warningf("Dropping the key out of the queue: %v", err)
}
func (c *controller) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
func (gen *Generator) processNextWorkItem() bool {
obj, shutdown := gen.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer c.queue.Done(obj)
defer gen.queue.Done(obj)
var key Info
var ok bool
if key, ok = obj.(Info); !ok {
c.queue.Forget(obj)
gen.queue.Forget(obj)
glog.Warningf("Expecting type info by got %v\n", obj)
return nil
}
err := c.syncHandler(key)
c.handleErr(err, obj)
err := gen.syncHandler(key)
gen.handleErr(err, obj)
return nil
}(obj)
if err != nil {
@ -142,20 +138,20 @@ func (c *controller) processNextWorkItem() bool {
return true
}
func (c *controller) syncHandler(key Info) error {
func (gen *Generator) syncHandler(key Info) error {
var robj runtime.Object
var err error
switch key.Kind {
case "Policy":
//TODO: policy is clustered resource so wont need namespace
robj, err = c.policyLister.Get(key.Name)
robj, err = gen.policyLister.Get(key.Name)
if err != nil {
glog.Errorf("Error creating event: unable to get policy %s, will retry ", key.Name)
return err
}
default:
robj, err = c.client.GetResource(key.Kind, key.Namespace, key.Name)
robj, err = gen.client.GetResource(key.Kind, key.Namespace, key.Name)
if err != nil {
glog.Errorf("Error creating event: unable to get resource %s, %s, will retry ", key.Kind, key.Namespace+"/"+key.Name)
return err
@ -163,13 +159,14 @@ func (c *controller) syncHandler(key Info) error {
}
if key.Reason == PolicyApplied.String() {
c.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message)
gen.recorder.Event(robj, v1.EventTypeNormal, key.Reason, key.Message)
} else {
c.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message)
gen.recorder.Event(robj, v1.EventTypeWarning, key.Reason, key.Message)
}
return nil
}
//TODO: check if we need this ?
//NewEvent returns a new event
func NewEvent(rkind string, rnamespace string, rname string, reason Reason, message MsgKey, args ...interface{}) *Info {
msgText, err := getEventMsg(message, args...)

View file

@ -43,11 +43,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) *v1be
if !StringInSlice(request.Kind.Kind, getApplicableKindsForPolicy(policy)) {
continue
}
policyInfo := info.NewPolicyInfo(policy.Name,
resource.GetKind(),
resource.GetName(),
resource.GetNamespace(),
policy.Spec.ValidationFailureAction)
policyInfo := info.NewPolicyInfo(policy.Name, resource.GetKind(), resource.GetName(), resource.GetNamespace(), policy.Spec.ValidationFailureAction)
glog.V(4).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation)
@ -72,6 +68,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest) *v1be
// ADD ANNOTATIONS
// ADD EVENTS
// ADD POLICY VIOLATIONS
ok, msg := isAdmSuccesful(policyInfos)
if ok {

View file

@ -29,7 +29,7 @@ type WebhookServer struct {
server http.Server
client *client.Client
policyLister v1alpha1.PolicyLister
eventController event.Generator
eventGen event.Interface
violationBuilder violation.Generator
annotationsController annotations.Controller
filterK8Resources []utils.K8Resource
@ -41,7 +41,7 @@ func NewWebhookServer(
client *client.Client,
tlsPair *tlsutils.TlsPemPair,
shareInformer sharedinformer.PolicyInformer,
eventController event.Generator,
eventGen event.Interface,
violationBuilder violation.Generator,
annotationsController annotations.Controller,
filterK8Resources string) (*WebhookServer, error) {
@ -60,7 +60,7 @@ func NewWebhookServer(
ws := &WebhookServer{
client: client,
policyLister: shareInformer.GetLister(),
eventController: eventController,
eventGen: eventGen,
violationBuilder: violationBuilder,
annotationsController: annotationsController,
filterK8Resources: utils.ParseKinds(filterK8Resources),

View file

@ -7,18 +7,21 @@ import (
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// HandleValidation handles validating webhook admission request
// If there are no errors in validating rule we apply generation rules
func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
// var patches [][]byte
var policyInfos []*info.PolicyInfo
glog.V(4).Infof("Receive request in validating webhook: Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation)
policyInfos := []*info.PolicyInfo{}
policies, err := ws.policyLister.List(labels.NewSelector())
if err != nil {
//TODO check if the CRD is created ?
// Unable to connect to policy Lister to access policies
glog.Error("Unable to connect to policy controller to access policies. Validation Rules are NOT being applied")
glog.Warning(err)
@ -27,36 +30,29 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1
}
}
rname := engine.ParseNameFromObject(request.Object.Raw)
rns := engine.ParseNamespaceFromObject(request.Object.Raw)
rkind := request.Kind.Kind
if rkind == "" {
glog.Errorf("failed to parse KIND from request: Namespace=%s Name=%s UID=%s patchOperation=%s\n", request.Namespace, request.Name, request.UID, request.Operation)
resource, err := convertToUnstructured(request.Object.Raw)
if err != nil {
glog.Errorf("unable to convert raw resource to unstructured: %v", err)
}
//TODO: check if resource gvk is available in raw resource,
// if not then set it from the api request
resource.SetGroupVersionKind(schema.GroupVersionKind{Group: request.Kind.Group, Version: request.Kind.Version, Kind: request.Kind.Kind})
//TODO: check if the name and namespace is also passed right in the resource?
// all the patches to be applied on the resource
for _, policy := range policies {
if !StringInSlice(request.Kind.Kind, getApplicableKindsForPolicy(policy)) {
continue
}
//TODO: HACK Check if an update of annotations
if checkIfOnlyAnnotationsUpdate(request) {
// allow the update of resource to add annotations
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
policyInfo := info.NewPolicyInfo(policy.Name,
rkind,
rname,
rns,
policy.Spec.ValidationFailureAction)
policyInfo := info.NewPolicyInfo(policy.Name, resource.GetKind(), resource.GetName(), resource.GetNamespace(), policy.Spec.ValidationFailureAction)
glog.V(3).Infof("Handling validation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
request.Kind.Kind, rns, rname, request.UID, request.Operation)
glog.V(4).Infof("Handling validation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation)
glog.V(4).Infof("Applying policy %s with %d rules\n", policy.ObjectMeta.Name, len(policy.Spec.Rules))
glog.Infof("Validating resource %s/%s/%s with policy %s with %d rules", rkind, rns, rname, policy.ObjectMeta.Name, len(policy.Spec.Rules))
ruleInfos, err := engine.Validate(*policy, request.Object.Raw, request.Kind)
if err != nil {
// This is not policy error
@ -68,42 +64,22 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest) *v1
policyInfo.AddRuleInfos(ruleInfos)
if !policyInfo.IsSuccessful() {
glog.Infof("Failed to apply policy %s on resource %s/%s", policy.Name, rname, rns)
glog.Infof("Failed to apply policy %s on resource %s/%s", policy.Name, resource.GetNamespace(), resource.GetName())
glog.V(4).Info("Failed rule details")
for _, r := range ruleInfos {
glog.Warningf("%s: %s\n", r.Name, r.Msgs)
}
} else {
// CleanUp Violations if exists
err := ws.violationBuilder.RemoveInactiveViolation(policy.Name, request.Kind.Kind, rns, rname, info.Validation)
if err != nil {
glog.Info(err)
}
if len(ruleInfos) > 0 {
glog.Infof("Validation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, rname, rns)
glog.V(4).Infof("%s: %s\n", r.Name, r.Msgs)
}
continue
}
if len(ruleInfos) > 0 {
glog.V(4).Infof("Validation from policy %s has applied succesfully to %s %s/%s", policy.Name, request.Kind.Kind, resource.GetNamespace(), resource.GetName())
}
policyInfos = append(policyInfos, policyInfo)
// annotations
annPatch := addAnnotationsToResource(request.Object.Raw, policyInfo, info.Validation)
if annPatch != nil {
ws.annotationsController.Add(rkind, rns, rname, annPatch)
}
}
if len(policyInfos) > 0 && len(policyInfos[0].Rules) != 0 {
eventsInfo, violations := newEventInfoFromPolicyInfo(policyInfos, (request.Operation == v1beta1.Update), info.Validation)
// If the validationFailureAction flag is set "audit",
// then we dont block the request and report the violations
ws.violationBuilder.Add(violations...)
ws.eventController.Add(eventsInfo...)
}
// If Validation fails then reject the request
// ADD EVENTS
// ADD POLICY VIOLATIONS
ok, msg := isAdmSuccesful(policyInfos)
// violations are created if "audit" flag is set
// and if there are any then we dont bock the resource creation
// Even if one the policy being applied
if !ok && toBlock(policyInfos) {
return &v1beta1.AdmissionResponse{
Allowed: false,