1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

NK9: Controller is a separate module now

This commit is contained in:
belyshevdenis 2019-02-12 16:55:14 +02:00
parent a6bc743f99
commit e96562a1cf
2 changed files with 49 additions and 44 deletions

View file

@ -1,37 +1,56 @@
package main package controller
import ( import (
"time" "time"
"fmt" "fmt"
"k8s.io/sample-controller/pkg/signals"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
clientset "nirmata/kube-policy/pkg/client/clientset/versioned" clientset "nirmata/kube-policy/pkg/client/clientset/versioned"
informer "nirmata/kube-policy/pkg/client/informers/externalversions/policy/v1alpha1" informers "nirmata/kube-policy/pkg/client/informers/externalversions"
lister "nirmata/kube-policy/pkg/client/listers/policy/v1alpha1" lister "nirmata/kube-policy/pkg/client/listers/policy/v1alpha1"
) )
// Controller for CRD // Controller for CRD
type Controller struct { type Controller struct {
policyClientset clientset.Interface policyClientset clientset.Interface
policyInformerFactory informers.SharedInformerFactory
policyLister lister.PolicyLister policyLister lister.PolicyLister
policiesSynced cache.InformerSynced policiesSynced cache.InformerSynced
workqueue workqueue.RateLimitingInterface workqueue workqueue.RateLimitingInterface
} }
// NewController is used to create Controller // NewController from cmd args
func NewController(clientset clientset.Interface, informer informer.PolicyInformer) *Controller { func NewController(masterURL, kubeconfigPath string) (*Controller, error) {
controller := &Controller { cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfigPath)
policyClientset: clientset, if err != nil {
policyLister: informer.Lister(), fmt.Printf("Error building kubeconfig: %v\n", err)
policiesSynced: informer.Informer().HasSynced, return nil, err
}
policyClientset, err := clientset.NewForConfig(cfg)
if err != nil {
fmt.Printf("Error building policy clientset: %v\n", err)
return nil, err
}
policyInformerFactory := informers.NewSharedInformerFactory(policyClientset, time.Second*30)
policyInformer := policyInformerFactory.Nirmata().V1alpha1().Policies()
controller := &Controller {
policyClientset: policyClientset,
policyInformerFactory: policyInformerFactory,
policyLister: policyInformer.Lister(),
policiesSynced: policyInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Policies"), workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Policies"),
} }
// Set up an event handler for when Foo resources change // Set up an event handler for when Foo resources change
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ policyInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueFoo, AddFunc: controller.enqueueFoo,
UpdateFunc: func(old, new interface{}) { UpdateFunc: func(old, new interface{}) {
controller.enqueueFoo(new) controller.enqueueFoo(new)
@ -39,11 +58,13 @@ func NewController(clientset clientset.Interface, informer informer.PolicyInform
DeleteFunc: controller.enqueueFoo, DeleteFunc: controller.enqueueFoo,
}) })
return controller return controller, nil
} }
// Run is main controller thread // Run is main controller thread
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { func (c *Controller) Run(threadiness int) error {
stopCh := signals.SetupSignalHandler()
c.policyInformerFactory.Start(stopCh)
defer c.workqueue.ShutDown() defer c.workqueue.ShutDown()

50
main.go
View file

@ -1,49 +1,33 @@
package main package main
import ( import (
"time" "runtime"
"flag" "flag"
"fmt" "fmt"
"k8s.io/sample-controller/pkg/signals"
"k8s.io/client-go/tools/clientcmd"
clientset "nirmata/kube-policy/pkg/client/clientset/versioned" controller "nirmata/kube-policy/controller"
informers "nirmata/kube-policy/pkg/client/informers/externalversions"
) )
var ( var (
masterURL string masterURL string
kubeconfig string kubeconfig string
) )
func main() { func main() {
flag.Parse() flag.Parse()
stopCh := signals.SetupSignalHandler() controller, err := controller.NewController(masterURL, kubeconfig)
if err != nil {
fmt.Println("Error running Controller!")
}
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) err = controller.Run(runtime.NumCPU())
if err != nil { if err != nil {
fmt.Printf("Error building kubeconfig: %v\n", err) fmt.Println("Error running Controller!")
} }
exampleClient, err := clientset.NewForConfig(cfg)
if err != nil {
fmt.Printf("Error building example clientset: %v\n", err)
}
fmt.Println("Hello from Policy Controller!")
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)
controller := NewController(exampleClient, exampleInformerFactory.Nirmata().V1alpha1().Policies())
exampleInformerFactory.Start(stopCh)
if err = controller.Run(4, stopCh); err != nil {
fmt.Println("Error running Controller!")
}
} }
func init() { func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
} }