1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-06 16:06:56 +00:00

add clean up in job controller

This commit is contained in:
Shuting Zhao 2020-10-14 19:00:13 -07:00
parent d181d10fcb
commit 8eb7f13355
6 changed files with 117 additions and 46 deletions

View file

@ -26,13 +26,30 @@ import (
// PolicyReportSummary provides a status count summary // PolicyReportSummary provides a status count summary
type PolicyReportSummary struct { type PolicyReportSummary struct {
Pass int `json:"pass"`
Fail int `json:"fail"` // Pass provides the count of policies whose requirements were met
Warn int `json:"warn"` Pass int `json:"pass"`
// Fail provides the count of policies whose requirements were not met
Fail int `json:"fail"`
// Warn provides the count of unscored policies whose requirements were not met
Warn int `json:"warn"`
// Error provides the count of policies that could not be evaluated
Error int `json:"error"` Error int `json:"error"`
Skip int `json:"skip"`
// Skip indicates the count of policies that were not selected for evaluation
Skip int `json:"skip"`
} }
// PolicyStatus has one of the following values:
// - Pass: indicates that the policy requirements are met
// - Fail: indicates that the policy requirements are not met
// - Warn: indicates that the policy requirements and not met, and the policy is not scored
// - Error: indicates that the policy could not be evaluated
// - Skip: indicates that the policy was not selected based on user inputs or applicability
//
// +kubebuilder:validation:Enum=Pass;Fail;Warn;Error;Skip // +kubebuilder:validation:Enum=Pass;Fail;Warn;Error;Skip
type PolicyStatus string type PolicyStatus string
@ -46,12 +63,16 @@ type PolicyReportResult struct {
// +optional // +optional
Rule string `json:"rule,omitempty"` Rule string `json:"rule,omitempty"`
// Resources is an optional reference to the resource check bu the policy rule // Resources is an optional reference to the resource checked by the policy and rule
Resources []*ResourceStatus `json:"resources"` // +optional
Resources []*ResourceStatus `json:"resources,omitempty"`
// Message is a short user friendly description of the policy rule // Message is a short user friendly description of the policy rule
Message string `json:"message,omitempty"` Message string `json:"message,omitempty"`
// Status indicates the result of the policy rule check
Status PolicyStatus `json:"status,omitempty"`
// Scored indicates if this policy rule is scored // Scored indicates if this policy rule is scored
Scored bool `json:"scored,omitempty"` Scored bool `json:"scored,omitempty"`
@ -66,7 +87,7 @@ type ResourceStatus struct {
Resource *corev1.ObjectReference `json:"resource"` Resource *corev1.ObjectReference `json:"resource"`
// Status indicates the result of the policy rule check // Status indicates the result of the policy rule check
Status PolicyStatus `json:"status"` Status PolicyStatus `json:"status,omitempty"`
} }
// +genclient // +genclient

View file

@ -59,7 +59,7 @@ var (
// KubePolicyDeploymentName define the default deployment namespace // KubePolicyDeploymentName define the default deployment namespace
KubePolicyDeploymentName = "kyverno" KubePolicyDeploymentName = "kyverno"
// Kyverno CLI Image // Kyverno CLI Image
KyvernoCliImage = "evalsocket/kyverno-cli:latest" KyvernoCliImage = "nirmata/kyverno-cli:latest"
// ConfimapNameForPolicyReport // ConfimapNameForPolicyReport
ConfimapNameForPolicyReport = "kyverno-event" ConfimapNameForPolicyReport = "kyverno-event"

View file

@ -14,6 +14,7 @@ import (
v1 "k8s.io/api/batch/v1" v1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1" apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -58,7 +59,6 @@ type dataStore struct {
func (ds *dataStore) add(keyHash string, info JobInfo) { func (ds *dataStore) add(keyHash string, info JobInfo) {
ds.mu.Lock() ds.mu.Lock()
defer ds.mu.Unlock() defer ds.mu.Unlock()
// queue the key hash
ds.data[keyHash] = info ds.data[keyHash] = info
} }
@ -74,14 +74,12 @@ func (ds *dataStore) delete(keyHash string) {
delete(ds.data, keyHash) delete(ds.data, keyHash)
} }
// make the struct hashable
//JobsInterface provides API to create PVs //JobsInterface provides API to create PVs
type JobsInterface interface { type JobsInterface interface {
Add(infos ...JobInfo) Add(infos ...JobInfo)
} }
// NewJobsJob returns a new instance of policy violation generator // NewJobsJob returns a new instance of jobs generator
func NewJobsJob(dclient *dclient.Client, func NewJobsJob(dclient *dclient.Client,
configHandler config.Interface, configHandler config.Interface,
log logr.Logger) *Job { log logr.Logger) *Job {
@ -96,16 +94,14 @@ func NewJobsJob(dclient *dclient.Client,
} }
func (j *Job) enqueue(info JobInfo) { func (j *Job) enqueue(info JobInfo) {
// add to data map
keyHash := info.toKey() keyHash := info.toKey()
// add to
// queue the key hash
j.dataStore.add(keyHash, info) j.dataStore.add(keyHash, info)
j.queue.Add(keyHash) j.queue.Add(keyHash)
j.log.V(4).Info("job added to the queue", "keyhash", keyHash)
} }
//Add queues a policy violation create request //Add queues a job creation request
func (j *Job) Add(infos ...JobInfo) { func (j *Job) Add(infos ...JobInfo) {
for _, info := range infos { for _, info := range infos {
j.enqueue(info) j.enqueue(info)
@ -146,7 +142,6 @@ func (j *Job) handleErr(err error, key interface{}) {
} }
j.queue.Forget(key) j.queue.Forget(key)
// remove from data store
if keyHash, ok := key.(string); ok { if keyHash, ok := key.(string); ok {
j.dataStore.delete(keyHash) j.dataStore.delete(keyHash)
} }
@ -222,6 +217,8 @@ func (j *Job) syncHandler(info JobInfo) error {
func (j *Job) syncKyverno(wg *sync.WaitGroup, scope, jobType, data string) error { func (j *Job) syncKyverno(wg *sync.WaitGroup, scope, jobType, data string) error {
defer wg.Done() defer wg.Done()
go j.cleanupCompletedJobs()
mode := "cli" mode := "cli"
args := []string{ args := []string{
"report", "report",
@ -262,16 +259,6 @@ func (j *Job) syncKyverno(wg *sync.WaitGroup, scope, jobType, data string) error
args = append(args, fmt.Sprintf("-p=%s", data)) args = append(args, fmt.Sprintf("-p=%s", data))
} }
resourceList, err := j.dclient.ListResource("", "Job", config.KubePolicyNamespace, &metav1.LabelSelector{
MatchLabels: map[string]string{
"scope": scope,
"type": jobType,
},
})
if err != nil {
return fmt.Errorf("failed to list jobs: %v", err)
}
exbackoff := &backoff.ExponentialBackOff{ exbackoff := &backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval, InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
@ -282,7 +269,17 @@ func (j *Job) syncKyverno(wg *sync.WaitGroup, scope, jobType, data string) error
} }
exbackoff.Reset() exbackoff.Reset()
err = backoff.Retry(func() error { err := backoff.Retry(func() error {
resourceList, err := j.dclient.ListResource("", "Job", config.KubePolicyNamespace, &metav1.LabelSelector{
MatchLabels: map[string]string{
"scope": scope,
"type": jobType,
},
})
if err != nil {
return fmt.Errorf("failed to list jobs: %v", err)
}
if len(resourceList.Items) != 0 { if len(resourceList.Items) != 0 {
return fmt.Errorf("found %d Jobs", len(resourceList.Items)) return fmt.Errorf("found %d Jobs", len(resourceList.Items))
} }
@ -298,6 +295,9 @@ func (j *Job) syncKyverno(wg *sync.WaitGroup, scope, jobType, data string) error
// CreateJob will create Job template for background scan // CreateJob will create Job template for background scan
func (j *Job) CreateJob(args []string, jobType, scope string) error { func (j *Job) CreateJob(args []string, jobType, scope string) error {
ttl := new(int32)
*ttl = 60
job := &v1.Job{ job := &v1.Job{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
Namespace: config.KubePolicyNamespace, Namespace: config.KubePolicyNamespace,
@ -313,7 +313,7 @@ func (j *Job) CreateJob(args []string, jobType, scope string) error {
{ {
Name: strings.ToLower(fmt.Sprintf("%s-%s", jobType, scope)), Name: strings.ToLower(fmt.Sprintf("%s-%s", jobType, scope)),
Image: config.KyvernoCliImage, Image: config.KyvernoCliImage,
ImagePullPolicy: "Always", ImagePullPolicy: apiv1.PullNever,
Args: args, Args: args,
}, },
}, },
@ -324,6 +324,7 @@ func (j *Job) CreateJob(args []string, jobType, scope string) error {
}, },
} }
job.Spec.TTLSecondsAfterFinished = ttl
job.SetGenerateName("kyverno-policyreport-") job.SetGenerateName("kyverno-policyreport-")
if _, err := j.dclient.CreateResource("", "Job", config.KubePolicyNamespace, job, false); err != nil { if _, err := j.dclient.CreateResource("", "Job", config.KubePolicyNamespace, job, false); err != nil {
return fmt.Errorf("failed to create job: %v", err) return fmt.Errorf("failed to create job: %v", err)
@ -331,3 +332,61 @@ func (j *Job) CreateJob(args []string, jobType, scope string) error {
return nil return nil
} }
func (j *Job) cleanupCompletedJobs() {
logger := j.log.WithName("cleanup jobs")
exbackoff := &backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: time.Second,
MaxElapsedTime: 2 * time.Minute,
Clock: backoff.SystemClock,
}
exbackoff.Reset()
err := backoff.Retry(func() error {
resourceList, err := j.dclient.ListResource("", "Job", config.KubePolicyNamespace, nil)
if err != nil {
return fmt.Errorf("failed to list jobs : %v", err)
}
if err != nil {
return fmt.Errorf("failed to list pods : %v", err)
}
for _, job := range resourceList.Items {
succeeded, ok, _ := unstructured.NestedInt64(job.Object, "status", "succeeded")
if ok && succeeded > 0 {
if errnew := j.dclient.DeleteResource("", "Job", job.GetNamespace(), job.GetName(), false); errnew != nil {
err = errnew
continue
}
podList, errNew := j.dclient.ListResource("", "Pod", config.KubePolicyNamespace, &metav1.LabelSelector{
MatchLabels: map[string]string{
"job-name": job.GetName(),
},
})
if errNew != nil {
err = errNew
continue
}
for _, pod := range podList.Items {
if errpod := j.dclient.DeleteResource("", "Pod", pod.GetNamespace(), pod.GetName(), false); errpod != nil {
logger.Error(errpod, "failed to delete pod", "name", pod.GetName())
err = errpod
}
}
}
}
return err
}, exbackoff)
if err != nil {
logger.Error(err, "failed to clean up completed jobs")
}
}

View file

@ -178,7 +178,7 @@ func Command() *cobra.Command {
} }
var resources []*unstructured.Unstructured var resources []*unstructured.Unstructured
if resourcePaths[0] == "-" { if len(resourcePaths) > 0 && resourcePaths[0] == "-" {
if common.IsInputFromPipe() { if common.IsInputFromPipe() {
resourceStr := "" resourceStr := ""
scanner := bufio.NewScanner(os.Stdin) scanner := bufio.NewScanner(os.Stdin)

View file

@ -212,7 +212,7 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
} }
results := make(map[string][]policyreportv1alpha1.PolicyReportResult) results := make(map[string][]policyreportv1alpha1.PolicyReportResult)
for key, _ := range resourceMap { for key := range resourceMap {
for _, resource := range resourceMap[key] { for _, resource := range resourceMap[key] {
policyContext := engine.PolicyContext{ policyContext := engine.PolicyContext{
NewResource: resource, NewResource: resource,
@ -225,7 +225,7 @@ func backgroundScan(n, scope, policychange string, wg *sync.WaitGroup, restConfi
} }
} }
for k, _ := range results { for k := range results {
if k == "" { if k == "" {
continue continue
} }
@ -346,6 +346,7 @@ func createResults(policyContext engine.PolicyContext, key string, results map[s
Policy: pv.Spec.Policy, Policy: pv.Spec.Policy,
Rule: e.Name, Rule: e.Name,
Message: e.Message, Message: e.Message,
Status: policyreportv1alpha1.PolicyStatus(e.Check),
} }
rd := &policyreportv1alpha1.ResourceStatus{ rd := &policyreportv1alpha1.ResourceStatus{
Resource: &corev1.ObjectReference{ Resource: &corev1.ObjectReference{
@ -354,7 +355,6 @@ func createResults(policyContext engine.PolicyContext, key string, results map[s
APIVersion: pv.Spec.APIVersion, APIVersion: pv.Spec.APIVersion,
Name: pv.Spec.Name, Name: pv.Spec.Name,
}, },
Status: policyreportv1alpha1.PolicyStatus(e.Check),
} }
result.Resources = append(result.Resources, rd) result.Resources = append(result.Resources, rd)
results[appname] = append(results[appname], *result) results[appname] = append(results[appname], *result)
@ -443,6 +443,7 @@ func configmapScan(scope string, wg *sync.WaitGroup, restConfig *rest.Config, lo
Policy: pv.Spec.Policy, Policy: pv.Spec.Policy,
Rule: r.Name, Rule: r.Name,
Message: r.Message, Message: r.Message,
Status: policyreportv1alpha1.PolicyStatus(r.Check),
} }
rd := &policyreportv1alpha1.ResourceStatus{ rd := &policyreportv1alpha1.ResourceStatus{
Resource: &corev1.ObjectReference{ Resource: &corev1.ObjectReference{
@ -451,7 +452,6 @@ func configmapScan(scope string, wg *sync.WaitGroup, restConfig *rest.Config, lo
APIVersion: pv.Spec.APIVersion, APIVersion: pv.Spec.APIVersion,
Name: pv.Spec.Name, Name: pv.Spec.Name,
}, },
Status: policyreportv1alpha1.PolicyStatus(r.Check),
} }
result.Resources = append(result.Resources, rd) result.Resources = append(result.Resources, rd)
@ -531,6 +531,7 @@ func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1
Policy: v.Policy, Policy: v.Policy,
Rule: v.Rule, Rule: v.Rule,
Message: v.Message, Message: v.Message,
Status: v.Status,
Resources: make([]*policyreportv1alpha1.ResourceStatus, 0), Resources: make([]*policyreportv1alpha1.ResourceStatus, 0),
} }
@ -557,6 +558,7 @@ func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1
Policy: v.Policy, Policy: v.Policy,
Rule: v.Rule, Rule: v.Rule,
Message: v.Message, Message: v.Message,
Status: v.Status,
Resources: make([]*policyreportv1alpha1.ResourceStatus, 0), Resources: make([]*policyreportv1alpha1.ResourceStatus, 0),
} }
rules[key].Resources = append(rules[key].Resources, r) rules[key].Resources = append(rules[key].Resources, r)
@ -576,7 +578,7 @@ func mergeReport(pr *policyreportv1alpha1.PolicyReport, results []policyreportv1
pr.Summary.Pass = 0 pr.Summary.Pass = 0
pr.Summary.Fail = 0 pr.Summary.Fail = 0
pr.Results = make([]*policyreportv1alpha1.PolicyReportResult, 0) pr.Results = make([]*policyreportv1alpha1.PolicyReportResult, 0)
for k, _ := range rules { for k := range rules {
pr.Results = append(pr.Results, rules[k]) pr.Results = append(pr.Results, rules[k])
for _, r := range rules[k].Resources { for _, r := range rules[k].Resources {
if string(r.Status) == "Pass" { if string(r.Status) == "Pass" {

View file

@ -2,10 +2,6 @@ package policy
import ( import (
"fmt" "fmt"
"os"
"github.com/kyverno/kyverno/pkg/common"
"github.com/kyverno/kyverno/pkg/policyreport"
"github.com/go-logr/logr" "github.com/go-logr/logr"
"github.com/kyverno/kyverno/pkg/engine/response" "github.com/kyverno/kyverno/pkg/engine/response"
@ -27,13 +23,6 @@ func (pc *PolicyController) cleanupAndReport(engineResponses []response.EngineRe
pvInfos[i].FromSync = true pvInfos[i].FromSync = true
} }
if os.Getenv("POLICY-TYPE") == common.PolicyReport {
for _, v := range pvInfos {
pc.prGenerator.Add(policyreport.Info(v))
}
return
}
pc.pvGenerator.Add(pvInfos...) pc.pvGenerator.Add(pvInfos...)
// cleanup existing violations if any // cleanup existing violations if any