1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 18:38:40 +00:00

rename internal components

This commit is contained in:
shivdudhani 2019-05-10 10:38:38 -07:00
parent 9a7be94930
commit 04f5716f7b
8 changed files with 98 additions and 92 deletions

View file

@ -11,7 +11,7 @@ import (
policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned"
informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions"
violation "github.com/nirmata/kube-policy/pkg/violation"
policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation"
event "github.com/nirmata/kube-policy/pkg/event"
"k8s.io/sample-controller/pkg/signals"
@ -44,11 +44,12 @@ func main() {
policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies()
eventController := event.NewEventController(kubeclient, policyInformer.Lister(), nil)
violationBuilder := violation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil)
violationBuilder := policyviolation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil)
policyController := policycontroller.NewPolicyController(policyClientset,
policyInformer,
violationBuilder,
eventController,
nil,
kubeclient)
@ -56,6 +57,7 @@ func main() {
kubeclient,
policyInformer.Lister(),
violationBuilder,
eventController,
nil)
if err != nil {
log.Fatalf("Error creating mutation webhook: %v\n", err)

View file

@ -19,7 +19,7 @@ import (
"k8s.io/client-go/util/workqueue"
)
type eventController struct {
type controller struct {
kubeClient *kubeClient.KubeClient
policyLister policylister.PolicyLister
queue workqueue.RateLimitingInterface
@ -27,19 +27,22 @@ type eventController struct {
logger *log.Logger
}
// EventGenertor to generate event
type EventGenerator interface {
Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{})
//Generator to generate event
type Generator interface {
Add(kind string, resource string, reason Reason, message MsgKey, args ...interface{})
}
type EventController interface {
EventGenerator
//Controller api
type Controller interface {
Generator
Run(stopCh <-chan struct{}) error
}
//NewEventController to generate a new event controller
func NewEventController(kubeClient *kubeClient.KubeClient,
policyLister policylister.PolicyLister,
logger *log.Logger) EventController {
controller := &eventController{
logger *log.Logger) Controller {
controller := &controller{
kubeClient: kubeClient,
policyLister: policyLister,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
@ -63,8 +66,8 @@ func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder {
return recorder
}
func (eb *eventController) Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) {
eb.queue.Add(eb.newEvent(
func (c *controller) Add(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) {
c.queue.Add(c.newEvent(
kind,
resource,
reason,
@ -72,16 +75,15 @@ func (eb *eventController) Add(kind string, resource string, reason Reason, mess
))
}
// Run : Initialize the worker routines to process the event creation
func (eb *eventController) Run(stopCh <-chan struct{}) error {
func (c *controller) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer eb.queue.ShutDown()
defer c.queue.ShutDown()
log.Println("starting eventbuilder controller")
log.Println("Starting eventbuilder controller workers")
for i := 0; i < eventWorkerThreadCount; i++ {
go wait.Until(eb.runWorker, time.Second, stopCh)
go wait.Until(c.runWorker, time.Second, stopCh)
}
log.Println("Started eventbuilder controller workers")
<-stopCh
@ -89,28 +91,28 @@ func (eb *eventController) Run(stopCh <-chan struct{}) error {
return nil
}
func (eb *eventController) runWorker() {
for eb.processNextWorkItem() {
func (c *controller) runWorker() {
for c.processNextWorkItem() {
}
}
func (eb *eventController) processNextWorkItem() bool {
obj, shutdown := eb.queue.Get()
func (c *controller) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer eb.queue.Done(obj)
defer c.queue.Done(obj)
var key eventInfo
var ok bool
if key, ok = obj.(eventInfo); !ok {
eb.queue.Forget(obj)
c.queue.Forget(obj)
log.Printf("Expecting type info by got %v", obj)
return nil
}
// Run the syncHandler, passing the resource and the policy
if err := eb.SyncHandler(key); err != nil {
eb.queue.AddRateLimited(key)
if err := c.SyncHandler(key); err != nil {
c.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error())
}
return nil
@ -122,7 +124,7 @@ func (eb *eventController) processNextWorkItem() bool {
return true
}
func (eb *eventController) SyncHandler(key eventInfo) error {
func (c *controller) SyncHandler(key eventInfo) error {
var resource runtime.Object
var err error
switch key.Kind {
@ -132,30 +134,23 @@ func (eb *eventController) SyncHandler(key eventInfo) error {
utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", key.Resource))
return err
}
resource, err = eb.policyLister.Policies(namespace).Get(name)
resource, err = c.policyLister.Policies(namespace).Get(name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource))
return err
}
default:
resource, err = eb.kubeClient.GetResource(key.Kind, key.Resource)
resource, err = c.kubeClient.GetResource(key.Kind, key.Resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource))
return err
}
}
eb.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message)
c.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message)
return nil
}
type eventInfo struct {
Kind string
Resource string
Reason string
Message string
}
func (eb *eventController) newEvent(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) eventInfo {
func (c *controller) newEvent(kind string, resource string, reason Reason, message MsgKey, args ...interface{}) eventInfo {
msgText, err := getEventMsg(message, args)
if err != nil {
utilruntime.HandleError(err)

View file

@ -5,20 +5,7 @@ import (
"regexp"
)
//Key to describe the event
type EventMsg int
const (
FResourcePolcy EventMsg = iota
FProcessRule
SPolicyApply
SRuleApply
FPolicyApplyBlockCreate
FPolicyApplyBlockUpdate
FPolicyApplyBlockUpdateRule
)
func (k EventMsg) String() string {
func (k MsgKey) String() string {
return [...]string{
"Failed to satisfy policy on resource %s.The following rules %s failed to apply. Created Policy Violation",
"Failed to process rule %s of policy %s. Created Policy Violation %s",
@ -34,7 +21,7 @@ const argRegex = "%[s,d,v]"
//GetEventMsg return the application message based on the message id and the arguments,
// if the number of arguments passed to the message are incorrect generate an error
func getEventMsg(key EventMsg, args ...interface{}) (string, error) {
func getEventMsg(key MsgKey, args ...interface{}) (string, error) {
// Verify the number of arguments
re := regexp.MustCompile(argRegex)
argsCount := len(re.FindAllString(key.String(), -1))

View file

@ -5,3 +5,23 @@ const eventSource = "policy-controller"
const eventWorkQueueName = "policy-controller-events"
const eventWorkerThreadCount = 1
type eventInfo struct {
Kind string
Resource string
Reason string
Message string
}
//MsgKey is an identified to determine the preset message formats
type MsgKey int
const (
FResourcePolcy MsgKey = iota
FProcessRule
SPolicyApply
SRuleApply
FPolicyApplyBlockCreate
FPolicyApplyBlockUpdate
FPolicyApplyBlockUpdateRule
)

View file

@ -1,4 +1,4 @@
package violation
package policyviolation
import (
"fmt"
@ -13,32 +13,35 @@ import (
"k8s.io/client-go/tools/cache"
)
type PolicyViolationGenerator interface {
//Generator to generate policy violation
type Generator interface {
Add(info ViolationInfo) error
}
type policyViolationBuilder struct {
type builder struct {
kubeClient *kubeClient.KubeClient
policyLister policylister.PolicyLister
policyInterface policyclientset.Interface
eventBuilder event.EventGenerator
eventBuilder event.Generator
logger *log.Logger
}
type PolicyViolationBuilder interface {
PolicyViolationGenerator
//Builder is to build policy violations
type Builder interface {
Generator
processViolation(info ViolationInfo) error
isActive(kind string, resource string) (bool, error)
}
//NewPolicyViolationBuilder returns new violation builder
func NewPolicyViolationBuilder(
kubeClient *kubeClient.KubeClient,
policyLister policylister.PolicyLister,
policyInterface policyclientset.Interface,
eventController event.EventGenerator,
logger *log.Logger) PolicyViolationBuilder {
eventController event.Generator,
logger *log.Logger) Builder {
builder := &policyViolationBuilder{
builder := &builder{
kubeClient: kubeClient,
policyLister: policyLister,
policyInterface: policyInterface,
@ -48,18 +51,18 @@ func NewPolicyViolationBuilder(
return builder
}
func (pvb *policyViolationBuilder) Add(info ViolationInfo) error {
return pvb.processViolation(info)
func (b *builder) Add(info ViolationInfo) error {
return b.processViolation(info)
}
func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error {
func (b *builder) processViolation(info ViolationInfo) error {
// Get the policy
namespace, name, err := cache.SplitMetaNamespaceKey(info.Policy)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", info.Policy))
return err
}
policy, err := pvb.policyLister.Policies(namespace).Get(name)
policy, err := b.policyLister.Policies(namespace).Get(name)
if err != nil {
utilruntime.HandleError(err)
return err
@ -77,13 +80,13 @@ func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error {
}
for _, violation := range modifiedPolicy.Status.Violations {
ok, err := pvb.isActive(info.Kind, violation.Resource)
ok, err := b.isActive(info.Kind, violation.Resource)
if err != nil {
utilruntime.HandleError(err)
continue
}
if !ok {
pvb.logger.Printf("removed violation ")
b.logger.Printf("removed violation")
}
}
// If violation already exists for this rule, we update the violation
@ -92,16 +95,16 @@ func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error {
modifiedPolicy.Status.Violations = modifiedViolations
// Violations are part of the status sub resource, so we can use the Update Status api instead of updating the policy object
_, err = pvb.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy)
_, err = b.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy)
if err != nil {
return err
}
return nil
}
func (pvb *policyViolationBuilder) isActive(kind string, resource string) (bool, error) {
func (b *builder) isActive(kind string, resource string) (bool, error) {
// Generate Merge Patch
_, err := pvb.kubeClient.GetResource(kind, resource)
_, err := b.kubeClient.GetResource(kind, resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", resource))
return false, err

View file

@ -1,4 +1,4 @@
package violation
package policyviolation
import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
@ -11,6 +11,7 @@ const workqueueViolationName = "Policy-Violations"
// Event Reason
const violationEventResrouce = "Violation"
//ViolationInfo describes the policyviolation details
type ViolationInfo struct {
Policy string
policytype.Violation

View file

@ -10,23 +10,25 @@ import (
policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned"
infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1"
lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
violation "github.com/nirmata/kube-policy/pkg/violation"
event "github.com/nirmata/kube-policy/pkg/event"
policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
//PolicyController for CRD
//PolicyController to manage Policy CRD
type PolicyController struct {
kubeClient *kubeClient.KubeClient
policyLister lister.PolicyLister
policyInterface policyclientset.Interface
policySynced cache.InformerSynced
violationBuilder violation.PolicyViolationGenerator
violationBuilder policyviolation.Generator
eventBuilder event.Generator
logger *log.Logger
queue workqueue.RateLimitingInterface
}
@ -34,7 +36,8 @@ type PolicyController struct {
// NewPolicyController from cmd args
func NewPolicyController(policyInterface policyclientset.Interface,
policyInformer infomertypes.PolicyInformer,
violationBuilder violation.PolicyViolationGenerator,
violationBuilder policyviolation.Generator,
eventController event.Generator,
logger *log.Logger,
kubeClient *kubeClient.KubeClient) *PolicyController {
@ -44,9 +47,9 @@ func NewPolicyController(policyInterface policyclientset.Interface,
policyInterface: policyInterface,
policySynced: policyInformer.Informer().HasSynced,
violationBuilder: violationBuilder,
eventBuilder: eventController,
logger: logger,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName),
//TODO Event Builder, this will used to record events with policy cannot be processed, using eventBuilder as we can restrict the event types
}
policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -69,6 +72,7 @@ func (pc *PolicyController) updatePolicyHandler(oldResource, newResource interfa
}
pc.enqueuePolicy(newResource)
}
func (pc *PolicyController) deletePolicyHandler(resource interface{}) {
var object metav1.Object
var ok bool
@ -112,16 +116,11 @@ func (pc *PolicyController) Run(stopCh <-chan struct{}) error {
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (pc *PolicyController) runWorker() {
for pc.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (pc *PolicyController) processNextWorkItem() bool {
obj, shutdown := pc.queue.Get()
if shutdown {
@ -146,20 +145,15 @@ func (pc *PolicyController) handleErr(err error, key interface{}) {
pc.queue.Forget(key)
return
}
// This controller retries 5 times if something goes wrong. After that, it stops trying.
// This controller retries if something goes wrong. After that, it stops trying.
if pc.queue.NumRequeues(key) < policyWorkQueueRetryLimit {
pc.logger.Printf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
pc.queue.AddRateLimited(key)
return
}
pc.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
utilruntime.HandleError(err)
pc.logger.Printf("Dropping the key %q out of the queue: %v", key, err)
}
@ -173,7 +167,7 @@ func (pc *PolicyController) syncHandler(obj interface{}) error {
// convert the namespace/name string into distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
utilruntime.HandleError(fmt.Errorf("invalid policy key: %s", key))
return nil
}
@ -181,7 +175,7 @@ func (pc *PolicyController) syncHandler(obj interface{}) error {
policy, err := pc.policyLister.Policies(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
utilruntime.HandleError(fmt.Errorf("policy '%s' in work queue no longer exists", key))
return nil
}
return err

View file

@ -10,8 +10,9 @@ import (
kubeclient "github.com/nirmata/kube-policy/kubeclient"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
event "github.com/nirmata/kube-policy/pkg/event"
mutation "github.com/nirmata/kube-policy/pkg/mutation"
violation "github.com/nirmata/kube-policy/pkg/violation"
policyviolation "github.com/nirmata/kube-policy/pkg/policyviolation"
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -26,7 +27,8 @@ type MutationWebhook struct {
kubeclient *kubeclient.KubeClient
policyLister policylister.PolicyLister
registration *MutationWebhookRegistration
violationBuilder violation.PolicyViolationGenerator
violationBuilder policyviolation.Generator
eventBuilder event.Generator
logger *log.Logger
}
@ -35,7 +37,8 @@ func CreateMutationWebhook(
clientConfig *rest.Config,
kubeclient *kubeclient.KubeClient,
policyLister policylister.PolicyLister,
violationBuilder violation.PolicyViolationGenerator,
violationBuilder policyviolation.Generator,
eventController event.Generator,
logger *log.Logger) (*MutationWebhook, error) {
if clientConfig == nil || kubeclient == nil {
return nil, errors.New("Some parameters are not set")
@ -59,6 +62,7 @@ func CreateMutationWebhook(
policyLister: policyLister,
registration: registration,
violationBuilder: violationBuilder,
eventBuilder: eventController,
logger: logger,
}, nil
}