1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 07:57:07 +00:00
kyverno/pkg/event/controller.go

183 lines
4.9 KiB
Go
Raw Normal View History

2019-05-10 00:05:21 -07:00
package event
import (
"fmt"
"log"
2019-05-14 18:02:11 +03:00
"os"
2019-05-10 00:05:21 -07:00
"time"
client "github.com/nirmata/kube-policy/client"
2019-05-10 00:05:21 -07:00
"github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme"
policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme"
2019-05-15 12:29:09 -07:00
v1alpha1 "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
"github.com/nirmata/kube-policy/pkg/sharedinformer"
2019-05-10 00:05:21 -07:00
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
2019-05-10 10:38:38 -07:00
type controller struct {
client *client.Client
2019-05-15 12:29:09 -07:00
policyLister v1alpha1.PolicyLister
2019-05-10 00:05:21 -07:00
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
logger *log.Logger
}
2019-05-10 10:38:38 -07:00
//Generator to generate event
type Generator interface {
2019-05-10 12:36:55 -07:00
Add(info Info)
2019-05-10 00:05:21 -07:00
}
2019-05-10 10:38:38 -07:00
//Controller api
type Controller interface {
Generator
2019-05-14 18:02:11 +03:00
Run(stopCh <-chan struct{})
Stop()
2019-05-10 00:05:21 -07:00
}
2019-05-10 10:38:38 -07:00
//NewEventController to generate a new event controller
func NewEventController(client *client.Client,
2019-05-15 12:29:09 -07:00
shareInformer sharedinformer.PolicyInformer,
2019-05-10 10:38:38 -07:00
logger *log.Logger) Controller {
2019-05-14 18:02:11 +03:00
if logger == nil {
logger = log.New(os.Stdout, "Event Controller: ", log.LstdFlags)
2019-05-14 18:02:11 +03:00
}
2019-05-10 10:38:38 -07:00
controller := &controller{
client: client,
2019-05-15 12:29:09 -07:00
policyLister: shareInformer.GetLister(),
2019-05-10 00:05:21 -07:00
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
recorder: initRecorder(client),
2019-05-10 00:05:21 -07:00
logger: logger,
}
return controller
}
func initRecorder(client *client.Client) record.EventRecorder {
2019-05-10 00:05:21 -07:00
// Initliaze Event Broadcaster
policyscheme.AddToScheme(scheme.Scheme)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.Printf)
eventInterface, err := client.GetEventsInterface()
if err != nil {
utilruntime.HandleError(err) // TODO: add more specific error
return nil
}
2019-05-10 00:05:21 -07:00
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: eventInterface})
2019-05-10 00:05:21 -07:00
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: eventSource})
return recorder
}
2019-05-10 12:36:55 -07:00
func (c *controller) Add(info Info) {
c.queue.Add(info)
2019-05-10 00:05:21 -07:00
}
2019-05-14 18:02:11 +03:00
func (c *controller) Run(stopCh <-chan struct{}) {
2019-05-10 00:05:21 -07:00
defer utilruntime.HandleCrash()
2019-05-10 10:38:38 -07:00
defer c.queue.ShutDown()
2019-05-10 00:05:21 -07:00
for i := 0; i < eventWorkerThreadCount; i++ {
2019-05-10 10:38:38 -07:00
go wait.Until(c.runWorker, time.Second, stopCh)
2019-05-10 00:05:21 -07:00
}
2019-05-15 15:08:06 -07:00
c.logger.Println("Started eventbuilder controller workers")
2019-05-10 00:05:21 -07:00
}
func (c *controller) Stop() {
2019-05-15 15:08:06 -07:00
c.logger.Println("Shutting down eventbuilder controller workers")
}
2019-05-10 10:38:38 -07:00
func (c *controller) runWorker() {
for c.processNextWorkItem() {
2019-05-10 00:05:21 -07:00
}
}
2019-05-10 10:38:38 -07:00
func (c *controller) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
2019-05-10 00:05:21 -07:00
if shutdown {
return false
}
err := func(obj interface{}) error {
2019-05-10 10:38:38 -07:00
defer c.queue.Done(obj)
2019-05-10 12:36:55 -07:00
var key Info
2019-05-10 00:05:21 -07:00
var ok bool
2019-05-10 12:36:55 -07:00
if key, ok = obj.(Info); !ok {
2019-05-10 10:38:38 -07:00
c.queue.Forget(obj)
2019-05-14 18:02:11 +03:00
c.logger.Printf("Expecting type info by got %v\n", obj)
2019-05-10 00:05:21 -07:00
return nil
}
// Run the syncHandler, passing the resource and the policy
2019-05-10 10:38:38 -07:00
if err := c.SyncHandler(key); err != nil {
c.queue.AddRateLimited(key)
2019-05-10 00:05:21 -07:00
return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error())
}
return nil
}(obj)
if err != nil {
2019-05-15 15:08:06 -07:00
c.logger.Println((err))
2019-05-10 00:05:21 -07:00
}
return true
}
2019-05-10 12:36:55 -07:00
func (c *controller) SyncHandler(key Info) error {
2019-05-10 00:05:21 -07:00
var resource runtime.Object
var err error
2019-05-10 00:05:21 -07:00
switch key.Kind {
case "Policy":
//TODO: policy is clustered resource so wont need namespace
resource, err = c.policyLister.Get(key.Resource)
2019-05-10 00:05:21 -07:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource))
return err
}
default:
namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key.Resource))
2019-05-10 00:05:21 -07:00
return err
}
resource, err = c.client.GetResource(key.Kind, namespace, name)
if err != nil {
return err
}
//TODO: Test if conversion from unstructured to runtime.Object is implicit or explicit conversion is required
// resourceobj, err := client.ConvertToRuntimeObject(resource)
// if err != nil {
// return err
// }
// resource, err = c.kubeClient.GetResource(key.Kind, key.Resource)
2019-05-10 00:05:21 -07:00
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource))
return err
}
}
2019-05-10 10:38:38 -07:00
c.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message)
2019-05-10 00:05:21 -07:00
return nil
}
2019-05-10 12:36:55 -07:00
//NewEvent returns a new event
func NewEvent(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) Info {
2019-05-10 00:05:21 -07:00
msgText, err := getEventMsg(message, args)
if err != nil {
utilruntime.HandleError(err)
}
2019-05-10 12:36:55 -07:00
return Info{
2019-05-10 00:05:21 -07:00
Kind: kind,
Resource: resource,
Reason: reason.String(),
Message: msgText,
}
}