1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-04-08 10:04:25 +00:00

remove policy controller for policyreport

This commit is contained in:
Yuvraj 2020-09-03 22:19:37 +05:30
parent fdd908e20e
commit e15ed829ca
7 changed files with 61 additions and 76 deletions

View file

@ -197,25 +197,28 @@ func main() {
log.Log.WithName("PolicyViolationGenerator"),
stopCh,
)
var policyCtrl *policy.PolicyController
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
// - status aggregator: receives stats when a policy is applied & updates the policy status
policyCtrl, err = policy.NewPolicyController(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().Policies(),
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().PolicyViolations(),
configData,
eventGenerator,
pvgen,
rWebhookWatcher,
kubeInformer.Core().V1().Namespaces(),
jobController,
log.Log.WithName("PolicyController"),
)
}
// POLICY CONTROLLER
// - reconciliation policy and policy violation
// - process policy on existing resources
// - status aggregator: receives stats when a policy is applied & updates the policy status
policyCtrl, err := policy.NewPolicyController(pclient,
client,
pInformer.Kyverno().V1().ClusterPolicies(),
pInformer.Kyverno().V1().Policies(),
pInformer.Kyverno().V1().ClusterPolicyViolations(),
pInformer.Kyverno().V1().PolicyViolations(),
configData,
eventGenerator,
pvgen,
rWebhookWatcher,
kubeInformer.Core().V1().Namespaces(),
jobController,
log.Log.WithName("PolicyController"),
)
if err != nil {
setupLog.Error(err, "Failed to create policy controller")
@ -337,7 +340,10 @@ func main() {
go grgen.Run(1)
go rWebhookWatcher.Run(stopCh)
go configData.Run(stopCh)
go policyCtrl.Run(3, stopCh)
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
go policyCtrl.Run(3, stopCh)
}
go eventGenerator.Run(3, stopCh)
go grc.Run(1, stopCh)
go grcc.Run(1, stopCh)

1
go.mod
View file

@ -38,6 +38,7 @@ require (
golang.org/x/time v0.0.0-20191024005414-555d28b269f0 // indirect
golang.org/x/tools v0.0.0-20200823205832-c024452afbcd // indirect
google.golang.org/appengine v1.6.5 // indirect
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20200121175148-a6ecf24a6d71
gotest.tools v2.2.0+incompatible
k8s.io/api v0.17.4

View file

@ -58,6 +58,8 @@ var (
KubePolicyNamespace = getKyvernoNameSpace()
// KubePolicyDeploymentName define the default deployment namespace
KubePolicyDeploymentName = "kyverno"
// Kyverno CLI Image
KyvernoCliImage = "nirmata/kyverno-cli:latest"
//WebhookServiceName default kyverno webhook service name
WebhookServiceName = getWebhookServiceName()

View file

@ -2,6 +2,7 @@ package jobs
import (
"fmt"
"context"
"github.com/nirmata/kyverno/pkg/config"
v1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
@ -126,6 +127,25 @@ func (j *Job) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(j.runWorker, constant.PolicyViolationControllerResync, stopCh)
}
go func(){
ctx := context.Background()
ticker := time.NewTicker(100 * time.Second)
for {
select {
case <-ticker.C:
var wg sync.WaitGroup
wg.Add(3)
go j.syncNamespace(&wg, "Helm", "CONFIGMAP")
go j.syncNamespace(&wg, "Namespace", "CONFIGMAP")
go j.syncNamespace(&wg, "Cluster", "CONFIGMAP")
wg.Wait()
case <-ctx.Done():
break
// Create Jobs
}
}
}()
<-stopCh
}
@ -202,31 +222,22 @@ func (j *Job) syncHandler(info JobInfo) error {
j.mux.Unlock()
}()
j.mux.Lock()
if len(info.Policy) > 0 {
var wg sync.WaitGroup
wg.Add(3)
go j.syncNamespace(&wg, "Helm", "POLICY", info.Policy)
go j.syncNamespace(&wg, "Namespace", "POLICY", info.Policy)
go j.syncNamespace(&wg, "Cluster", "POLICY", info.Policy)
wg.Wait()
return nil
}
var wg sync.WaitGroup
wg.Add(3)
go j.syncNamespace(&wg, "Helm", "SYNC", info.Policy)
go j.syncNamespace(&wg, "Namespace", "SYNC", info.Policy)
go j.syncNamespace(&wg, "Cluster", "SYNC", info.Policy)
go j.syncNamespace(&wg, "Helm", "SYNC")
go j.syncNamespace(&wg, "Namespace", "SYNC")
go j.syncNamespace(&wg, "Cluster", "SYNC")
wg.Wait()
return nil
}
func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope, policy string) {
func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope string) {
defer func() {
wg.Done()
}()
var args []string
var mode string
if policy == "POLICY" {
if scope == "SYNC" {
mode = "cli"
} else {
mode = "configmap"
@ -240,7 +251,6 @@ func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope, policy string) {
"helm",
fmt.Sprintf("--mode=%s", mode),
}
job = CreateJob(args, jobType, scope)
break
case "Namespace":
args = []string{
@ -248,7 +258,6 @@ func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope, policy string) {
"namespace",
fmt.Sprintf("--mode=%s", mode),
}
job = CreateJob(args, jobType, scope)
break
case "Cluster":
args = []string{
@ -256,9 +265,9 @@ func (j *Job) syncNamespace(wg *sync.WaitGroup, jobType, scope, policy string) {
"cluster",
fmt.Sprintf("--mode=%s", mode),
}
job = CreateJob(args, jobType, scope)
break
}
job = CreateJob(args, jobType, scope)
_, err := j.dclient.CreateResource("", "Job", config.KubePolicyNamespace, job, false)
if err != nil {
return
@ -300,7 +309,7 @@ func CreateJob(args []string, jobType, scope string) *v1.Job {
Containers: []apiv1.Container{
{
Name: strings.ToLower(fmt.Sprintf("%s-%s", jobType, scope)),
Image: "evalsocket/kyverno-cli:latest",
Image: config.KyvernoCliImage,
ImagePullPolicy: "Always",
Args: args,
},

View file

@ -2,7 +2,6 @@ package policy
import (
"fmt"
"os"
"reflect"
"github.com/go-logr/logr"
@ -13,7 +12,6 @@ import (
)
func (pc *PolicyController) cleanUp(ers []response.EngineResponse) {
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
for _, er := range ers {
if !er.IsSuccessful() {
continue
@ -24,7 +22,7 @@ func (pc *PolicyController) cleanUp(ers []response.EngineResponse) {
// clean up after the policy has been corrected
pc.cleanUpPolicyViolation(er.PolicyResponse)
}
}
}
func (pc *PolicyController) cleanUpPolicyViolation(pResponse response.PolicyResponse) {

View file

@ -1,7 +1,6 @@
package policy
import (
"context"
"github.com/go-logr/logr"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
@ -26,7 +25,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"os"
"time"
)
@ -137,7 +135,6 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
pc.pvControl = RealPVControl{Client: kyvernoClient, Recorder: pc.eventRecorder}
if os.Getenv("POLICY-TYPE") != "POLICYREPORT" {
cpvInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addClusterPolicyViolation,
UpdateFunc: pc.updateClusterPolicyViolation,
@ -153,7 +150,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset,
pc.cpvListerSynced = cpvInformer.Informer().HasSynced
pc.nspvLister = nspvInformer.Lister()
pc.nspvListerSynced = nspvInformer.Informer().HasSynced
}
pInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: pc.addPolicy,
UpdateFunc: pc.updatePolicy,
@ -315,35 +312,17 @@ func (pc *PolicyController) Run(workers int, stopCh <-chan struct{}) {
logger.Info("starting")
defer logger.Info("shutting down")
if os.Getenv("POLICY-TYPE") == "POLICYREPORT" {
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.nsListerSynced) {
logger.Info("failed to sync informer cache")
return
}
} else {
if !cache.WaitForCacheSync(stopCh, pc.pListerSynced, pc.cpvListerSynced, pc.nspvListerSynced, pc.nsListerSynced) {
logger.Info("failed to sync informer cache")
return
}
}
for i := 0; i < workers; i++ {
go wait.Until(pc.worker, constant.PolicyControllerResync, stopCh)
}
ctx := context.Background()
ticker := time.NewTicker(100 * time.Second)
for {
select {
case <-ticker.C:
pc.job.Add(jobs.JobInfo{
Policy: "enabled",
})
case <-ctx.Done():
break
// Create Jobs
}
}
<- stopCh
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -389,9 +368,7 @@ func (pc *PolicyController) handleErr(err error, key interface{}) {
}
func (pc *PolicyController) syncPolicy(key string) error {
if os.Getenv("POLICY-TYPE") == "POLICYREPORT" {
return nil
}
logger := pc.log
startTime := time.Now()
logger.V(4).Info("started syncing policy", "key", key, "startTime", startTime)
@ -410,11 +387,8 @@ func (pc *PolicyController) syncPolicy(key string) error {
policy = ConvertPolicyToClusterPolicy(nspolicy)
}
if errors.IsNotFound(err) {
if os.Getenv("POLICY-TYPE") == "POLICYREPORT" {
// TODO (Yuvraj) Create Job for policy Sync
} else {
go pc.deletePolicyViolations(key)
}
// remove webhook configurations if there are no policies
if err := pc.removeResourceWebhookConfiguration(); err != nil {

View file

@ -112,11 +112,6 @@ func (i Info) toKey() string {
// make the struct hashable
//GeneratorInterface provides API to create PVs
type GeneratorInterface interface {
Add(infos ...Info)
}
type PVEvent struct {
Helm map[string][]Info
Namespace map[string][]Info