1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-28 18:38:40 +00:00

initial feature proposal

This commit is contained in:
shivdudhani 2019-05-10 00:05:21 -07:00
parent e8e33732cf
commit 9a7be94930
19 changed files with 603 additions and 549 deletions

View file

@ -1,209 +0,0 @@
package controller
import (
"errors"
"fmt"
"log"
"os"
"sort"
"time"
controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces"
kubeClient "github.com/nirmata/kube-policy/kubeclient"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
clientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned"
policies "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/typed/policy/v1alpha1"
informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions"
lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
event "github.com/nirmata/kube-policy/pkg/event"
eventinterfaces "github.com/nirmata/kube-policy/pkg/event/interfaces"
eventutils "github.com/nirmata/kube-policy/pkg/event/utils"
violation "github.com/nirmata/kube-policy/pkg/violation"
violationinterfaces "github.com/nirmata/kube-policy/pkg/violation/interfaces"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
mergetypes "k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
// PolicyController API
type PolicyController interface {
controllerinterfaces.PolicyGetter
controllerinterfaces.PolicyHandlers
Run(stopCh <-chan struct{})
}
//policyController for CRD
type policyController struct {
policyInformerFactory informers.SharedInformerFactory
policyLister lister.PolicyLister
policiesInterface policies.PolicyInterface
logger *log.Logger
violationBuilder violationinterfaces.ViolationGenerator
eventBuilder eventinterfaces.BuilderInternal
}
// NewPolicyController from cmd args
func NewPolicyController(config *rest.Config, logger *log.Logger, kubeClient *kubeClient.KubeClient) (PolicyController, error) {
if logger == nil {
logger = log.New(os.Stdout, "Policy Controller: ", log.LstdFlags|log.Lshortfile)
}
if config == nil {
return nil, errors.New("Client Config should be set for controller")
}
policyClientset, err := clientset.NewForConfig(config)
if err != nil {
return nil, err
}
policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0)
policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies()
// generate Event builder
eventBuilder, err := event.NewEventBuilder(kubeClient, logger)
if err != nil {
return nil, err
}
// generate Violation builer
violationBuilder, err := violation.NewViolationBuilder(kubeClient, eventBuilder, logger)
controller := &policyController{
policyInformerFactory: policyInformerFactory,
policyLister: policyInformer.Lister(),
policiesInterface: policyClientset.NirmataV1alpha1().Policies("default"),
logger: logger,
violationBuilder: violationBuilder,
eventBuilder: eventBuilder,
}
policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.CreatePolicyHandler,
UpdateFunc: controller.UpdatePolicyHandler,
DeleteFunc: controller.DeletePolicyHandler,
})
// Set the controller
eventBuilder.SetController(controller)
violationBuilder.SetController(controller)
return controller, nil
}
func (c *policyController) GetCacheInformerSync() cache.InformerSynced {
return c.policyInformerFactory.Nirmata().V1alpha1().Policies().Informer().HasSynced
}
// Run is main controller thread
func (c *policyController) Run(stopCh <-chan struct{}) {
c.policyInformerFactory.Start(stopCh)
c.eventBuilder.Run(eventutils.EventWorkerThreadCount, stopCh)
}
func (c *policyController) GetPolicies() ([]types.Policy, error) {
// Create nil Selector to grab all the policies
selector := labels.NewSelector()
cachedPolicies, err := c.policyLister.List(selector)
if err != nil {
c.logger.Printf("Error: %v", err)
return nil, err
}
var policies []types.Policy
for _, elem := range cachedPolicies {
policies = append(policies, *elem.DeepCopy())
}
sort.Slice(policies, func(i, j int) bool {
return policies[i].CreationTimestamp.Time.Before(policies[j].CreationTimestamp.Time)
})
return policies, nil
}
// Writes error message to the policy logs in status section
func (c *policyController) LogPolicyError(name, text string) {
c.addPolicyLog(name, "[ERROR] "+text)
}
// Writes info message to the policy logs in status section
func (c *policyController) LogPolicyInfo(name, text string) {
c.addPolicyLog(name, "[ INFO] "+text)
}
// This is the maximum number of records that can be written to the log object of the policy.
// If this number is exceeded, the older entries will be deleted.
const policyLogMaxRecords int = 50
// Appends given log text to the status/logs array.
func (c *policyController) addPolicyLog(name, text string) {
getOptions := metav1.GetOptions{
ResourceVersion: "1",
IncludeUninitialized: true,
}
policy, err := c.policiesInterface.Get(name, getOptions)
if err != nil {
c.logger.Printf("Unable to get policy %s: %s", name, err)
return
}
// Add new log record
text = time.Now().Format("2006 Jan 02 15:04:05.999 ") + text
policy.Status.Logs = append(policy.Status.Logs, text)
// Pop front extra log records
logsCount := len(policy.Status.Logs)
if logsCount > policyLogMaxRecords {
policy.Status.Logs = policy.Status.Logs[logsCount-policyLogMaxRecords:]
}
// Save logs to policy object
_, err = c.policiesInterface.UpdateStatus(policy)
if err != nil {
c.logger.Printf("Unable to update logs for policy %s: %s", name, err)
}
}
func (c *policyController) CreatePolicyHandler(resource interface{}) {
key := c.GetResourceKey(resource)
c.logger.Printf("Policy created: %s", key)
}
func (c *policyController) UpdatePolicyHandler(oldResource, newResource interface{}) {
oldKey := c.GetResourceKey(oldResource)
newKey := c.GetResourceKey(newResource)
c.logger.Printf("Policy %s updated to %s", oldKey, newKey)
}
func (c *policyController) DeletePolicyHandler(resource interface{}) {
key := c.GetResourceKey(resource)
c.logger.Printf("Policy deleted: %s", key)
}
func (c *policyController) GetResourceKey(resource interface{}) string {
if key, err := cache.MetaNamespaceKeyFunc(resource); err != nil {
c.logger.Fatalf("Error retrieving policy key: %v", err)
} else {
return key
}
return ""
}
func (c *policyController) GetPolicy(name string) (*types.Policy, error) {
policyNamespace, policyName, err := cache.SplitMetaNamespaceKey(name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", name))
return nil, err
}
return c.getPolicyInterface(policyNamespace).Get(policyName)
}
func (c *policyController) getPolicyInterface(namespace string) lister.PolicyNamespaceLister {
return c.policyLister.Policies(namespace)
}
func (c *policyController) PatchPolicy(policy string, pt mergetypes.PatchType, data []byte) (*types.Policy, error) {
return c.policiesInterface.Patch(policy, pt, data)
}
func (c *policyController) UpdatePolicyViolations(updatedPolicy *types.Policy) error {
_, err := c.policiesInterface.UpdateStatus(updatedPolicy)
return err
}

View file

@ -1,24 +0,0 @@
package interfaces
import (
policytypes "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
)
type PolicyGetter interface {
GetPolicies() ([]policytypes.Policy, error)
GetPolicy(name string) (*policytypes.Policy, error)
GetCacheInformerSync() cache.InformerSynced
PatchPolicy(policy string, pt types.PatchType, data []byte) (*policytypes.Policy, error)
UpdatePolicyViolations(updatedPolicy *policytypes.Policy) error
LogPolicyError(name, text string)
LogPolicyInfo(name, text string)
}
type PolicyHandlers interface {
CreatePolicyHandler(resource interface{})
UpdatePolicyHandler(oldResource, newResource interface{})
DeletePolicyHandler(resource interface{})
GetResourceKey(resource interface{}) string
}

46
main.go
View file

@ -4,12 +4,17 @@ import (
"flag"
"log"
"github.com/nirmata/kube-policy/controller"
"github.com/nirmata/kube-policy/kubeclient"
"github.com/nirmata/kube-policy/policycontroller"
"github.com/nirmata/kube-policy/server"
"github.com/nirmata/kube-policy/webhooks"
signals "k8s.io/sample-controller/pkg/signals"
policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned"
informers "github.com/nirmata/kube-policy/pkg/client/informers/externalversions"
violation "github.com/nirmata/kube-policy/pkg/violation"
event "github.com/nirmata/kube-policy/pkg/event"
"k8s.io/sample-controller/pkg/signals"
)
var (
@ -29,12 +34,29 @@ func main() {
log.Fatalf("Error creating kubeclient: %v\n", err)
}
controller, err := controller.NewPolicyController(clientConfig, nil, kubeclient)
policyClientset, err := policyclientset.NewForConfig(clientConfig)
if err != nil {
log.Fatalf("Error creating PolicyController: %s\n", err)
log.Fatalf("Error creating policyClient: %v\n", err)
}
mutationWebhook, err := webhooks.CreateMutationWebhook(clientConfig, kubeclient, controller, nil)
//TODO wrap the policyInformer inside a factory
policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, 0)
policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies()
eventController := event.NewEventController(kubeclient, policyInformer.Lister(), nil)
violationBuilder := violation.NewPolicyViolationBuilder(kubeclient, policyInformer.Lister(), policyClientset, eventController, nil)
policyController := policycontroller.NewPolicyController(policyClientset,
policyInformer,
violationBuilder,
nil,
kubeclient)
mutationWebhook, err := webhooks.CreateMutationWebhook(clientConfig,
kubeclient,
policyInformer.Lister(),
violationBuilder,
nil)
if err != nil {
log.Fatalf("Error creating mutation webhook: %v\n", err)
}
@ -51,17 +73,17 @@ func main() {
server.RunAsync()
stopCh := signals.SetupSignalHandler()
controller.Run(stopCh)
if err != nil {
log.Fatalf("Error running PolicyController: %s\n", err)
policyInformerFactory.Start(stopCh)
if err = eventController.Run(stopCh); err != nil {
log.Fatalf("Error running EventController: %v\n", err)
}
if err = policyController.Run(stopCh); err != nil {
log.Fatalf("Error running PolicyController: %v\n", err)
}
log.Println("Policy Controller has started")
<-stopCh
server.Stop()
log.Println("Policy Controller has stopped")
}
func init() {

View file

@ -1,161 +0,0 @@
package event
import (
"errors"
"fmt"
"log"
"time"
controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces"
kubeClient "github.com/nirmata/kube-policy/kubeclient"
"github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme"
policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme"
eventinterfaces "github.com/nirmata/kube-policy/pkg/event/interfaces"
utils "github.com/nirmata/kube-policy/pkg/event/utils"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
type builder struct {
kubeClient *kubeClient.KubeClient
controller controllerinterfaces.PolicyGetter
workqueue workqueue.RateLimitingInterface
recorder record.EventRecorder
logger *log.Logger
policySynced cache.InformerSynced
}
type Builder interface {
eventinterfaces.BuilderInternal
SyncHandler(key utils.EventInfo) error
ProcessNextWorkItem() bool
RunWorker()
}
func NewEventBuilder(kubeClient *kubeClient.KubeClient,
logger *log.Logger,
) (Builder, error) {
builder := &builder{
kubeClient: kubeClient,
workqueue: initWorkqueue(),
recorder: initRecorder(kubeClient),
logger: logger,
}
return builder, nil
}
func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder {
// Initliaze Event Broadcaster
policyscheme.AddToScheme(scheme.Scheme)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.Printf)
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: kubeClient.GetEventsInterface("")})
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: utils.EventSource})
return recorder
}
func initWorkqueue() workqueue.RateLimitingInterface {
return workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), utils.EventWorkQueueName)
}
func (b *builder) SetController(controller controllerinterfaces.PolicyGetter) {
b.controller = controller
b.policySynced = controller.GetCacheInformerSync()
}
func (b *builder) AddEvent(info utils.EventInfo) {
b.workqueue.Add(info)
}
// Run : Initialize the worker routines to process the event creation
func (b *builder) Run(threadiness int, stopCh <-chan struct{}) error {
if b.controller == nil {
return errors.New("Controller has not be set")
}
defer utilruntime.HandleCrash()
defer b.workqueue.ShutDown()
log.Println("Starting violation builder")
fmt.Println(("Wait for informer cache to sync"))
if ok := cache.WaitForCacheSync(stopCh, b.policySynced); !ok {
fmt.Println("Unable to sync the cache")
}
log.Println("Starting workers")
for i := 0; i < threadiness; i++ {
go wait.Until(b.RunWorker, time.Second, stopCh)
}
log.Println("Started workers")
<-stopCh
log.Println("Shutting down workers")
return nil
}
func (b *builder) RunWorker() {
for b.ProcessNextWorkItem() {
}
}
func (b *builder) ProcessNextWorkItem() bool {
obj, shutdown := b.workqueue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer b.workqueue.Done(obj)
var key utils.EventInfo
var ok bool
if key, ok = obj.(utils.EventInfo); !ok {
b.workqueue.Forget(obj)
log.Printf("Expecting type info by got %v", obj)
return nil
}
// Run the syncHandler, passing the resource and the policy
if err := b.SyncHandler(key); err != nil {
b.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error())
}
return nil
}(obj)
if err != nil {
log.Println((err))
}
return true
}
func (b *builder) SyncHandler(key utils.EventInfo) error {
var resource runtime.Object
var err error
switch key.Kind {
case "Policy":
resource, err = b.controller.GetPolicy(key.Resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource))
return err
}
default:
resource, err = b.kubeClient.GetResource(key.Kind, key.Resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource))
return err
}
}
b.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message)
return nil
}

View file

@ -0,0 +1,169 @@
package event
import (
"fmt"
"log"
"time"
kubeClient "github.com/nirmata/kube-policy/kubeclient"
"github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme"
policyscheme "github.com/nirmata/kube-policy/pkg/client/clientset/versioned/scheme"
policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
type eventController struct {
kubeClient *kubeClient.KubeClient
policyLister policylister.PolicyLister
queue workqueue.RateLimitingInterface
recorder record.EventRecorder
logger *log.Logger
}
// EventGenertor to generate event
type EventGenerator interface {
Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{})
}
type EventController interface {
EventGenerator
Run(stopCh <-chan struct{}) error
}
func NewEventController(kubeClient *kubeClient.KubeClient,
policyLister policylister.PolicyLister,
logger *log.Logger) EventController {
controller := &eventController{
kubeClient: kubeClient,
policyLister: policyLister,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), eventWorkQueueName),
recorder: initRecorder(kubeClient),
logger: logger,
}
return controller
}
func initRecorder(kubeClient *kubeClient.KubeClient) record.EventRecorder {
// Initliaze Event Broadcaster
policyscheme.AddToScheme(scheme.Scheme)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(log.Printf)
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{
Interface: kubeClient.GetEventsInterface("")})
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme,
v1.EventSource{Component: eventSource})
return recorder
}
func (eb *eventController) Add(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) {
eb.queue.Add(eb.newEvent(
kind,
resource,
reason,
message,
))
}
// Run : Initialize the worker routines to process the event creation
func (eb *eventController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer eb.queue.ShutDown()
log.Println("starting eventbuilder controller")
log.Println("Starting eventbuilder controller workers")
for i := 0; i < eventWorkerThreadCount; i++ {
go wait.Until(eb.runWorker, time.Second, stopCh)
}
log.Println("Started eventbuilder controller workers")
<-stopCh
log.Println("Shutting down eventbuilder controller workers")
return nil
}
func (eb *eventController) runWorker() {
for eb.processNextWorkItem() {
}
}
func (eb *eventController) processNextWorkItem() bool {
obj, shutdown := eb.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer eb.queue.Done(obj)
var key eventInfo
var ok bool
if key, ok = obj.(eventInfo); !ok {
eb.queue.Forget(obj)
log.Printf("Expecting type info by got %v", obj)
return nil
}
// Run the syncHandler, passing the resource and the policy
if err := eb.SyncHandler(key); err != nil {
eb.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s' : %s, requeuing event creation request", key.Resource, err.Error())
}
return nil
}(obj)
if err != nil {
log.Println((err))
}
return true
}
func (eb *eventController) SyncHandler(key eventInfo) error {
var resource runtime.Object
var err error
switch key.Kind {
case "Policy":
namespace, name, err := cache.SplitMetaNamespaceKey(key.Resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", key.Resource))
return err
}
resource, err = eb.policyLister.Policies(namespace).Get(name)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for policy %s, will retry ", key.Resource))
return err
}
default:
resource, err = eb.kubeClient.GetResource(key.Kind, key.Resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to create event for resource %s, will retry ", key.Resource))
return err
}
}
eb.recorder.Event(resource, v1.EventTypeNormal, key.Reason, key.Message)
return nil
}
type eventInfo struct {
Kind string
Resource string
Reason string
Message string
}
func (eb *eventController) newEvent(kind string, resource string, reason Reason, message EventMsg, args ...interface{}) eventInfo {
msgText, err := getEventMsg(message, args)
if err != nil {
utilruntime.HandleError(err)
}
return eventInfo{
Kind: kind,
Resource: resource,
Reason: reason.String(),
Message: msgText,
}
}

View file

@ -0,0 +1,45 @@
package event
import (
"fmt"
"regexp"
)
//Key to describe the event
type EventMsg int
const (
FResourcePolcy EventMsg = iota
FProcessRule
SPolicyApply
SRuleApply
FPolicyApplyBlockCreate
FPolicyApplyBlockUpdate
FPolicyApplyBlockUpdateRule
)
func (k EventMsg) String() string {
return [...]string{
"Failed to satisfy policy on resource %s.The following rules %s failed to apply. Created Policy Violation",
"Failed to process rule %s of policy %s. Created Policy Violation %s",
"Policy applied successfully on the resource %s",
"Rule %s of Policy %s applied successfull",
"Failed to apply policy, blocked creation of resource %s. The following rules %s failed to apply",
"Failed to apply rule %s of policy %s Blocked update of the resource",
"Failed to apply policy on resource %s.Blocked update of the resource. The following rules %s failed to apply",
}[k]
}
const argRegex = "%[s,d,v]"
//GetEventMsg return the application message based on the message id and the arguments,
// if the number of arguments passed to the message are incorrect generate an error
func getEventMsg(key EventMsg, args ...interface{}) (string, error) {
// Verify the number of arguments
re := regexp.MustCompile(argRegex)
argsCount := len(re.FindAllString(key.String(), -1))
if argsCount != len(args) {
return "", fmt.Errorf("message expects %d arguments, but %d arguments passed", argsCount, len(args))
}
return fmt.Sprintf(key.String(), args...), nil
}

View file

@ -0,0 +1,23 @@
package event
import (
"fmt"
"testing"
"gotest.tools/assert"
)
func TestPositive(t *testing.T) {
resourceName := "test_resource"
expectedMsg := fmt.Sprintf("Policy applied successfully on the resource %s", resourceName)
msg, err := getEventMsg(SPolicyApply, resourceName)
assert.NilError(t, err)
assert.Equal(t, expectedMsg, msg)
}
// passing incorrect args
func TestIncorrectArgs(t *testing.T) {
resourceName := "test_resource"
_, err := getEventMsg(SPolicyApply, resourceName, "extra_args")
assert.Error(t, err, "message expects 1 arguments, but 2 arguments passed")
}

View file

@ -1,12 +0,0 @@
package interfaces
import (
controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces"
utils "github.com/nirmata/kube-policy/pkg/event/utils"
)
type BuilderInternal interface {
SetController(controller controllerinterfaces.PolicyGetter)
Run(threadiness int, stopCh <-chan struct{}) error
AddEvent(info utils.EventInfo)
}

21
pkg/event/reason.go Normal file
View file

@ -0,0 +1,21 @@
package event
//Reason types of Event Reasons
type Reason int
const (
//PolicyViolation there is a violation of policy
PolicyViolation Reason = iota
//PolicyApplied policy applied
PolicyApplied
//RequestBlocked the request to create/update the resource was blocked( generated from admission-controller)
RequestBlocked
)
func (r Reason) String() string {
return [...]string{
"PolicyViolation",
"PolicyApplied",
"RequestBlocked",
}[r]
}

7
pkg/event/util.go Normal file
View file

@ -0,0 +1,7 @@
package event
const eventSource = "policy-controller"
const eventWorkQueueName = "policy-controller-events"
const eventWorkerThreadCount = 1

View file

@ -1,15 +0,0 @@
package utils
const EventSource = "policy-controller"
const EventWorkQueueName = "policy-controller-events"
type EventInfo struct {
Kind string
Resource string
Rule string
Reason string
Message string
}
const EventWorkerThreadCount = 1

View file

@ -1,60 +1,65 @@
package violation
import (
"encoding/json"
"fmt"
"log"
jsonpatch "github.com/evanphx/json-patch"
controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces"
kubeClient "github.com/nirmata/kube-policy/kubeclient"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
eventinterfaces "github.com/nirmata/kube-policy/pkg/event/interfaces"
eventutils "github.com/nirmata/kube-policy/pkg/event/utils"
violationinterfaces "github.com/nirmata/kube-policy/pkg/violation/interfaces"
utils "github.com/nirmata/kube-policy/pkg/violation/utils"
mergetypes "k8s.io/apimachinery/pkg/types"
policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned"
policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
event "github.com/nirmata/kube-policy/pkg/event"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
)
type builder struct {
kubeClient *kubeClient.KubeClient
controller controllerinterfaces.PolicyGetter
eventBuilder eventinterfaces.BuilderInternal
logger *log.Logger
type PolicyViolationGenerator interface {
Add(info ViolationInfo) error
}
type Builder interface {
violationinterfaces.ViolationGenerator
ProcessViolation(info utils.ViolationInfo) error
Patch(policy *types.Policy, updatedPolicy *types.Policy) error
IsActive(kind string, resource string) (bool, error)
type policyViolationBuilder struct {
kubeClient *kubeClient.KubeClient
policyLister policylister.PolicyLister
policyInterface policyclientset.Interface
eventBuilder event.EventGenerator
logger *log.Logger
}
func NewViolationBuilder(
type PolicyViolationBuilder interface {
PolicyViolationGenerator
processViolation(info ViolationInfo) error
isActive(kind string, resource string) (bool, error)
}
func NewPolicyViolationBuilder(
kubeClient *kubeClient.KubeClient,
eventBuilder eventinterfaces.BuilderInternal,
logger *log.Logger) (Builder, error) {
policyLister policylister.PolicyLister,
policyInterface policyclientset.Interface,
eventController event.EventGenerator,
logger *log.Logger) PolicyViolationBuilder {
builder := &builder{
kubeClient: kubeClient,
eventBuilder: eventBuilder,
logger: logger,
builder := &policyViolationBuilder{
kubeClient: kubeClient,
policyLister: policyLister,
policyInterface: policyInterface,
eventBuilder: eventController,
logger: logger,
}
return builder, nil
return builder
}
func (b *builder) Create(info utils.ViolationInfo) error {
return b.ProcessViolation(info)
func (pvb *policyViolationBuilder) Add(info ViolationInfo) error {
return pvb.processViolation(info)
}
func (b *builder) SetController(controller controllerinterfaces.PolicyGetter) {
b.controller = controller
}
func (b *builder) ProcessViolation(info utils.ViolationInfo) error {
func (pvb *policyViolationBuilder) processViolation(info ViolationInfo) error {
// Get the policy
policy, err := b.controller.GetPolicy(info.Policy)
namespace, name, err := cache.SplitMetaNamespaceKey(info.Policy)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to extract namespace and name for %s", info.Policy))
return err
}
policy, err := pvb.policyLister.Policies(namespace).Get(name)
if err != nil {
utilruntime.HandleError(err)
return err
@ -72,63 +77,34 @@ func (b *builder) ProcessViolation(info utils.ViolationInfo) error {
}
for _, violation := range modifiedPolicy.Status.Violations {
ok, err := b.IsActive(info.Kind, violation.Resource)
ok, err := pvb.isActive(info.Kind, violation.Resource)
if err != nil {
utilruntime.HandleError(err)
continue
}
if !ok {
// Remove the violation
// Create a removal event
b.eventBuilder.AddEvent(eventutils.EventInfo{
Kind: "Policy",
Resource: info.Policy,
Rule: info.Rule,
Reason: info.Reason,
Message: info.Message,
})
continue
pvb.logger.Printf("removed violation ")
}
// If violation already exists for this rule, we update the violation
//TODO: update violation, instead of re-creating one every time
}
// If violation already exists for this rule, we update the violation
//TODO: update violation, instead of re-creating one every time
modifiedViolations = append(modifiedViolations, newViolation)
modifiedPolicy.Status.Violations = modifiedViolations
// return b.Patch(policy, modifiedPolicy)
// Violations are part of the status sub resource, so we can use the Update Status api instead of updating the policy object
return b.controller.UpdatePolicyViolations(modifiedPolicy)
_, err = pvb.policyInterface.NirmataV1alpha1().Policies(namespace).UpdateStatus(modifiedPolicy)
if err != nil {
return err
}
return nil
}
func (b *builder) IsActive(kind string, resource string) (bool, error) {
func (pvb *policyViolationBuilder) isActive(kind string, resource string) (bool, error) {
// Generate Merge Patch
_, err := b.kubeClient.GetResource(kind, resource)
_, err := pvb.kubeClient.GetResource(kind, resource)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to get resource %s ", resource))
return false, err
}
return true, nil
}
func (b *builder) Patch(policy *types.Policy, updatedPolicy *types.Policy) error {
originalData, err := json.Marshal(policy)
if err != nil {
return err
}
modifiedData, err := json.Marshal(updatedPolicy)
if err != nil {
return err
}
// generate merge patch
patchBytes, err := jsonpatch.CreateMergePatch(originalData, modifiedData)
if err != nil {
return err
}
_, err = b.controller.PatchPolicy(policy.Name, mergetypes.MergePatchType, patchBytes)
if err != nil {
// Unable to patch
return err
}
return nil
}

View file

@ -1,11 +0,0 @@
package interfaces
import (
controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces"
utils "github.com/nirmata/kube-policy/pkg/violation/utils"
)
type ViolationGenerator interface {
SetController(controller controllerinterfaces.PolicyGetter)
Create(info utils.ViolationInfo) error
}

View file

@ -1,5 +1,7 @@
package violation
import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
// Source for the events recorder
const violationEventSource = "policy-controller"
@ -9,11 +11,7 @@ const workqueueViolationName = "Policy-Violations"
// Event Reason
const violationEventResrouce = "Violation"
// Info input details
type Info struct {
Kind string
Resource string
Policy string
RuleName string
Reason string
type ViolationInfo struct {
Policy string
policytype.Violation
}

View file

@ -1,8 +0,0 @@
package utils
import policytype "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
type ViolationInfo struct {
Policy string
policytype.Violation
}

View file

@ -0,0 +1,194 @@
package policycontroller
import (
"fmt"
"log"
"time"
kubeClient "github.com/nirmata/kube-policy/kubeclient"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
policyclientset "github.com/nirmata/kube-policy/pkg/client/clientset/versioned"
infomertypes "github.com/nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1"
lister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
violation "github.com/nirmata/kube-policy/pkg/violation"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
//PolicyController for CRD
type PolicyController struct {
kubeClient *kubeClient.KubeClient
policyLister lister.PolicyLister
policyInterface policyclientset.Interface
policySynced cache.InformerSynced
violationBuilder violation.PolicyViolationGenerator
logger *log.Logger
queue workqueue.RateLimitingInterface
}
// NewPolicyController from cmd args
func NewPolicyController(policyInterface policyclientset.Interface,
policyInformer infomertypes.PolicyInformer,
violationBuilder violation.PolicyViolationGenerator,
logger *log.Logger,
kubeClient *kubeClient.KubeClient) *PolicyController {
controller := &PolicyController{
kubeClient: kubeClient,
policyLister: policyInformer.Lister(),
policyInterface: policyInterface,
policySynced: policyInformer.Informer().HasSynced,
violationBuilder: violationBuilder,
logger: logger,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), policyWorkQueueName),
//TODO Event Builder, this will used to record events with policy cannot be processed, using eventBuilder as we can restrict the event types
}
policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.createPolicyHandler,
UpdateFunc: controller.updatePolicyHandler,
DeleteFunc: controller.deletePolicyHandler,
})
return controller
}
func (pc *PolicyController) createPolicyHandler(resource interface{}) {
pc.enqueuePolicy(resource)
}
func (pc *PolicyController) updatePolicyHandler(oldResource, newResource interface{}) {
newPolicy := newResource.(*types.Policy)
oldPolicy := oldResource.(*types.Policy)
if newPolicy.ResourceVersion == oldPolicy.ResourceVersion {
return
}
pc.enqueuePolicy(newResource)
}
func (pc *PolicyController) deletePolicyHandler(resource interface{}) {
var object metav1.Object
var ok bool
if object, ok = resource.(metav1.Object); !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
pc.logger.Printf("policy deleted: %s", object.GetName())
}
func (pc *PolicyController) enqueuePolicy(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
utilruntime.HandleError(err)
return
}
pc.queue.Add(key)
}
// Run is main controller thread
func (pc *PolicyController) Run(stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash()
defer pc.queue.ShutDown()
pc.logger.Printf("starting policy controller")
pc.logger.Printf("waiting for infomer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, pc.policySynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
pc.logger.Println("starting policy controller workers")
for i := 0; i < policyControllerWorkerCount; i++ {
go wait.Until(pc.runWorker, time.Second, stopCh)
}
pc.logger.Println("started policy controller workers")
<-stopCh
pc.logger.Println("shutting down policy controller workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (pc *PolicyController) runWorker() {
for pc.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (pc *PolicyController) processNextWorkItem() bool {
obj, shutdown := pc.queue.Get()
if shutdown {
return false
}
err := func(obj interface{}) error {
defer pc.queue.Done(obj)
err := pc.syncHandler(obj)
pc.handleErr(err, obj)
return nil
}(obj)
if err != nil {
utilruntime.HandleError(err)
return true
}
return true
}
func (pc *PolicyController) handleErr(err error, key interface{}) {
if err == nil {
pc.queue.Forget(key)
return
}
// This controller retries 5 times if something goes wrong. After that, it stops trying.
if pc.queue.NumRequeues(key) < policyWorkQueueRetryLimit {
pc.logger.Printf("Error syncing events %v: %v", key, err)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
pc.queue.AddRateLimited(key)
return
}
pc.queue.Forget(key)
// Report to an external entity that, even after several retries, we could not successfully process this key
utilruntime.HandleError(err)
pc.logger.Printf("Dropping the key %q out of the queue: %v", key, err)
}
func (pc *PolicyController) syncHandler(obj interface{}) error {
var key string
var ok bool
if key, ok = obj.(string); !ok {
return fmt.Errorf("expected string in workqueue but got %#v", obj)
}
// convert the namespace/name string into distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get Policy resource with namespace/name
policy, err := pc.policyLister.Policies(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
utilruntime.HandleError(fmt.Errorf("foo '%s' in work queue no longer exists", key))
return nil
}
return err
}
// process policy on existing resource
// get the violations and pass to violation Builder
// get the events and pass to event Builder
fmt.Println(policy)
return nil
}

View file

@ -1,9 +1,10 @@
package controller_test
package policycontroller
import (
"gotest.tools/assert"
"testing"
"gotest.tools/assert"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

View file

@ -0,0 +1,7 @@
package policycontroller
const policyWorkQueueName = "policyworkqueue"
const policyWorkQueueRetryLimit = 5
const policyControllerWorkerCount = 2

View file

@ -5,13 +5,17 @@ import (
"fmt"
"log"
"os"
"sort"
controllerinterfaces "github.com/nirmata/kube-policy/controller/interfaces"
kubeclient "github.com/nirmata/kube-policy/kubeclient"
types "github.com/nirmata/kube-policy/pkg/apis/policy/v1alpha1"
policylister "github.com/nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
mutation "github.com/nirmata/kube-policy/pkg/mutation"
violation "github.com/nirmata/kube-policy/pkg/violation"
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
rest "k8s.io/client-go/rest"
)
@ -19,15 +23,21 @@ import (
// MutationWebhook is a data type that represents
// business logic for resource mutation
type MutationWebhook struct {
kubeclient *kubeclient.KubeClient
controller controllerinterfaces.PolicyGetter
registration *MutationWebhookRegistration
logger *log.Logger
kubeclient *kubeclient.KubeClient
policyLister policylister.PolicyLister
registration *MutationWebhookRegistration
violationBuilder violation.PolicyViolationGenerator
logger *log.Logger
}
// Registers mutation webhook in cluster and creates object for this webhook
func CreateMutationWebhook(clientConfig *rest.Config, kubeclient *kubeclient.KubeClient, controller controllerinterfaces.PolicyGetter, logger *log.Logger) (*MutationWebhook, error) {
if clientConfig == nil || kubeclient == nil || controller == nil {
func CreateMutationWebhook(
clientConfig *rest.Config,
kubeclient *kubeclient.KubeClient,
policyLister policylister.PolicyLister,
violationBuilder violation.PolicyViolationGenerator,
logger *log.Logger) (*MutationWebhook, error) {
if clientConfig == nil || kubeclient == nil {
return nil, errors.New("Some parameters are not set")
}
@ -45,19 +55,40 @@ func CreateMutationWebhook(clientConfig *rest.Config, kubeclient *kubeclient.Kub
logger = log.New(os.Stdout, "Mutation WebHook: ", log.LstdFlags|log.Lshortfile)
}
return &MutationWebhook{
kubeclient: kubeclient,
controller: controller,
registration: registration,
logger: logger,
kubeclient: kubeclient,
policyLister: policyLister,
registration: registration,
violationBuilder: violationBuilder,
logger: logger,
}, nil
}
func (mw *MutationWebhook) getPolicies() ([]types.Policy, error) {
selector := labels.NewSelector()
cachedPolicies, err := mw.policyLister.List(selector)
if err != nil {
mw.logger.Printf("Error: %v", err)
return nil, err
}
var policies []types.Policy
for _, elem := range cachedPolicies {
policies = append(policies, *elem.DeepCopy())
}
sort.Slice(policies, func(i, j int) bool {
return policies[i].CreationTimestamp.Time.Before(policies[j].CreationTimestamp.Time)
})
return policies, nil
}
// Mutate applies admission to request
func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
mw.logger.Printf("AdmissionReview for Kind=%v, Namespace=%v Name=%v UID=%v patchOperation=%v UserInfo=%v",
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation, request.UserInfo)
policies, err := mw.controller.GetPolicies()
policies, err := mw.getPolicies()
if err != nil {
utilruntime.HandleError(err)
return nil
@ -72,7 +103,7 @@ func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.Ad
policyPatches, err := mw.applyPolicyRules(request, policy)
if err != nil {
mw.controller.LogPolicyError(policy.Name, err.Error())
//TODO Log Policy Error
errStr := fmt.Sprintf("Unable to apply policy %s: %v", policy.Name, err)
mw.logger.Printf("Denying the request because of error: %s", errStr)
@ -82,7 +113,7 @@ func (mw *MutationWebhook) Mutate(request *v1beta1.AdmissionRequest) *v1beta1.Ad
if len(policyPatches) > 0 {
namespace := mutation.ParseNamespaceFromObject(request.Object.Raw)
name := mutation.ParseNameFromObject(request.Object.Raw)
mw.controller.LogPolicyInfo(policy.Name, fmt.Sprintf("Applied to %s %s/%s", request.Kind.Kind, namespace, name))
//TODO Log Policy Info
mw.logger.Printf("%s applied to %s %s/%s", policy.Name, request.Kind.Kind, namespace, name)
allPatches = append(allPatches, policyPatches...)