1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-09 09:26:54 +00:00
kyverno/pkg/jobs/controller.go

336 lines
7.6 KiB
Go
Raw Normal View History

2020-08-27 13:19:41 +05:30
package jobs
import (
2020-09-09 02:08:04 -07:00
"fmt"
2020-09-10 10:19:36 -07:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
2020-09-09 02:08:04 -07:00
"strings"
"sync"
"time"
2020-08-27 13:19:41 +05:30
"github.com/nirmata/kyverno/pkg/config"
2020-08-31 00:06:21 +05:30
v1 "k8s.io/api/batch/v1"
2020-08-27 13:19:41 +05:30
apiv1 "k8s.io/api/core/v1"
2020-08-31 00:06:21 +05:30
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2020-08-31 23:18:25 +05:30
"k8s.io/apimachinery/pkg/runtime"
2020-08-27 13:19:41 +05:30
"github.com/go-logr/logr"
"github.com/nirmata/kyverno/pkg/constant"
dclient "github.com/nirmata/kyverno/pkg/dclient"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
const workQueueName = "policy-violation-controller"
const workQueueRetryLimit = 3
2020-09-02 01:10:08 +05:30
//Job creates policy report
2020-08-30 04:13:22 +05:30
type Job struct {
2020-09-10 05:10:29 -07:00
dclient *dclient.Client
log logr.Logger
queue workqueue.RateLimitingInterface
dataStore *dataStore
configHandler config.Interface
mux sync.Mutex
2020-08-27 13:19:41 +05:30
}
2020-09-02 01:10:08 +05:30
// Job Info Define Job Type
2020-08-27 13:19:41 +05:30
type JobInfo struct {
2020-08-31 00:06:21 +05:30
JobType string
2020-09-10 05:10:29 -07:00
JobData string
2020-08-27 13:19:41 +05:30
}
func (i JobInfo) toKey() string {
2020-09-10 05:10:29 -07:00
return fmt.Sprintf("kyverno-%v", i.JobType)
2020-08-27 13:19:41 +05:30
}
//NewDataStore returns an instance of data store
func newDataStore() *dataStore {
ds := dataStore{
data: make(map[string]JobInfo),
}
return &ds
}
type dataStore struct {
data map[string]JobInfo
mu sync.RWMutex
}
func (ds *dataStore) add(keyHash string, info JobInfo) {
ds.mu.Lock()
defer ds.mu.Unlock()
// queue the key hash
ds.data[keyHash] = info
}
func (ds *dataStore) lookup(keyHash string) JobInfo {
ds.mu.RLock()
defer ds.mu.RUnlock()
return ds.data[keyHash]
}
func (ds *dataStore) delete(keyHash string) {
ds.mu.Lock()
defer ds.mu.Unlock()
delete(ds.data, keyHash)
}
// make the struct hashable
//JobsInterface provides API to create PVs
type JobsInterface interface {
Add(infos ...JobInfo)
}
2020-08-30 04:13:22 +05:30
// NewJobsJob returns a new instance of policy violation generator
2020-08-30 23:57:30 +05:30
func NewJobsJob(dclient *dclient.Client,
2020-09-10 05:10:29 -07:00
configHandler config.Interface,
2020-08-30 04:13:22 +05:30
log logr.Logger) *Job {
gen := Job{
2020-09-10 05:10:29 -07:00
dclient: dclient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
dataStore: newDataStore(),
configHandler: configHandler,
log: log,
2020-08-27 13:19:41 +05:30
}
return &gen
}
2020-08-30 23:57:30 +05:30
func (j *Job) enqueue(info JobInfo) {
2020-08-27 13:19:41 +05:30
// add to data map
keyHash := info.toKey()
// add to
// queue the key hash
2020-08-30 23:57:30 +05:30
j.dataStore.add(keyHash, info)
j.queue.Add(keyHash)
2020-08-27 13:19:41 +05:30
}
//Add queues a policy violation create request
2020-08-30 23:57:30 +05:30
func (j *Job) Add(infos ...JobInfo) {
2020-08-27 13:19:41 +05:30
for _, info := range infos {
2020-08-30 23:57:30 +05:30
j.enqueue(info)
2020-08-27 13:19:41 +05:30
}
}
// Run starts the workers
2020-08-30 23:57:30 +05:30
func (j *Job) Run(workers int, stopCh <-chan struct{}) {
logger := j.log
2020-08-27 13:19:41 +05:30
defer utilruntime.HandleCrash()
logger.Info("start")
defer logger.Info("shutting down")
for i := 0; i < workers; i++ {
2020-08-30 23:57:30 +05:30
go wait.Until(j.runWorker, constant.PolicyViolationControllerResync, stopCh)
2020-08-27 13:19:41 +05:30
}
<-stopCh
}
2020-08-30 23:57:30 +05:30
func (j *Job) runWorker() {
for j.processNextWorkItem() {
2020-08-27 13:19:41 +05:30
}
}
2020-08-30 23:57:30 +05:30
func (j *Job) handleErr(err error, key interface{}) {
logger := j.log
2020-08-27 13:19:41 +05:30
if err == nil {
2020-08-30 23:57:30 +05:30
j.queue.Forget(key)
2020-08-27 13:19:41 +05:30
return
}
// retires requests if there is error
2020-08-30 23:57:30 +05:30
if j.queue.NumRequeues(key) < workQueueRetryLimit {
2020-08-27 13:19:41 +05:30
logger.Error(err, "failed to sync policy violation", "key", key)
// Re-enqueue the key rate limited. Based on the rate limiter on the
// queue and the re-enqueue history, the key will be processed later again.
2020-08-30 23:57:30 +05:30
j.queue.AddRateLimited(key)
2020-08-27 13:19:41 +05:30
return
}
2020-08-30 23:57:30 +05:30
j.queue.Forget(key)
2020-08-27 13:19:41 +05:30
// remove from data store
if keyHash, ok := key.(string); ok {
2020-08-30 23:57:30 +05:30
j.dataStore.delete(keyHash)
2020-08-27 13:19:41 +05:30
}
logger.Error(err, "dropping key out of the queue", "key", key)
}
2020-08-30 23:57:30 +05:30
func (j *Job) processNextWorkItem() bool {
logger := j.log
obj, shutdown := j.queue.Get()
2020-08-27 13:19:41 +05:30
if shutdown {
return false
}
err := func(obj interface{}) error {
2020-08-30 23:57:30 +05:30
defer j.queue.Done(obj)
2020-08-27 13:19:41 +05:30
var keyHash string
var ok bool
if keyHash, ok = obj.(string); !ok {
2020-08-30 23:57:30 +05:30
j.queue.Forget(obj)
2020-08-27 13:19:41 +05:30
logger.Info("incorrect type; expecting type 'string'", "obj", obj)
return nil
}
// lookup data store
2020-08-30 23:57:30 +05:30
info := j.dataStore.lookup(keyHash)
err := j.syncHandler(info)
j.handleErr(err, obj)
2020-08-27 13:19:41 +05:30
return nil
}(obj)
if err != nil {
logger.Error(err, "failed to process item")
return true
}
return true
}
2020-08-30 23:57:30 +05:30
func (j *Job) syncHandler(info JobInfo) error {
2020-08-31 00:06:21 +05:30
defer func() {
2020-08-30 23:57:30 +05:30
j.mux.Unlock()
2020-08-27 13:19:41 +05:30
}()
2020-08-30 23:57:30 +05:30
j.mux.Lock()
2020-09-10 10:19:36 -07:00
var wg sync.WaitGroup
2020-09-10 05:10:29 -07:00
if info.JobType == "POLICYSYNC" {
2020-09-15 06:59:05 -07:00
wg.Add(1)
go j.syncKyverno(&wg, "All", "SYNC", info.JobData)
2020-09-15 08:07:01 -07:00
}else if info.JobType == "CONFIGMAP" {
2020-09-10 10:19:36 -07:00
if info.JobData != "" {
2020-09-12 05:54:04 -07:00
str := strings.Split(info.JobData, ",")
2020-09-10 10:19:36 -07:00
wg.Add(len(str))
2020-09-12 05:54:04 -07:00
for _, scope := range str {
go j.syncKyverno(&wg, scope, "CONFIGMAP", "")
2020-09-10 10:19:36 -07:00
}
}
2020-09-10 05:10:29 -07:00
}
2020-08-27 13:19:41 +05:30
return nil
}
2020-09-12 05:54:04 -07:00
func (j *Job) syncKyverno(wg *sync.WaitGroup, jobType, scope, data string) {
2020-08-31 00:06:21 +05:30
var args []string
2020-08-30 23:57:30 +05:30
var mode string
2020-09-10 05:10:29 -07:00
if scope == "SYNC" || scope == "POLICYSYNC" {
2020-08-30 23:57:30 +05:30
mode = "cli"
2020-08-31 00:06:21 +05:30
} else {
2020-08-30 23:57:30 +05:30
mode = "configmap"
2020-08-27 13:19:41 +05:30
}
switch jobType {
2020-09-02 01:10:08 +05:30
case "Helm":
2020-08-30 23:57:30 +05:30
args = []string{
"report",
"helm",
2020-08-31 00:06:21 +05:30
fmt.Sprintf("--mode=%s", mode),
2020-08-30 23:57:30 +05:30
}
2020-08-31 00:06:21 +05:30
break
2020-09-02 01:10:08 +05:30
case "Namespace":
2020-08-30 23:57:30 +05:30
args = []string{
"report",
"namespace",
2020-08-31 00:06:21 +05:30
fmt.Sprintf("--mode=%s", mode),
2020-08-30 23:57:30 +05:30
}
2020-08-31 00:06:21 +05:30
break
2020-09-02 01:10:08 +05:30
case "Cluster":
2020-08-30 23:57:30 +05:30
args = []string{
"report",
"cluster",
2020-08-31 00:06:21 +05:30
fmt.Sprintf("--mode=%s", mode),
2020-08-30 23:57:30 +05:30
}
2020-08-27 13:19:41 +05:30
break
2020-09-15 06:59:05 -07:00
case "All":
args = []string{
"report",
"all",
fmt.Sprintf("--mode=%s", mode),
}
break
2020-08-27 13:19:41 +05:30
}
2020-09-09 02:08:04 -07:00
2020-09-10 05:10:29 -07:00
if scope == "POLICYSYNC" && data != "" {
2020-09-12 05:54:04 -07:00
args = append(args, fmt.Sprintf("-p=%s", data))
2020-09-10 05:10:29 -07:00
}
2020-09-15 06:59:05 -07:00
resourceList, err := j.dclient.ListResource("", "Job", config.KubePolicyNamespace, &metav1.LabelSelector{
MatchLabels: map[string]string{
"scope" : scope,
"type" : jobType,
},
})
if err != nil {
j.log.Error(err, "failed to get job")
}
if len(resourceList.Items) == 0 {
go j.CreateJob(args, jobType, scope, wg)
wg.Wait()
}else{
wg.Done()
}
2020-08-27 13:19:41 +05:30
}
2020-09-02 01:10:08 +05:30
// CreateJob will create Job template for background scan
2020-09-10 05:10:29 -07:00
func (j *Job) CreateJob(args []string, jobType, scope string, wg *sync.WaitGroup) {
2020-08-31 23:18:25 +05:30
job := &v1.Job{
2020-08-27 13:19:41 +05:30
ObjectMeta: metav1.ObjectMeta{
Namespace: config.KubePolicyNamespace,
2020-09-12 05:54:04 -07:00
Labels: map[string]string{
"scope": scope,
"type": jobType,
2020-09-10 05:10:29 -07:00
},
2020-08-27 13:19:41 +05:30
},
2020-08-30 23:57:30 +05:30
Spec: v1.JobSpec{
2020-08-27 13:19:41 +05:30
Template: apiv1.PodTemplateSpec{
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
2020-09-02 14:19:11 +05:30
Name: strings.ToLower(fmt.Sprintf("%s-%s", jobType, scope)),
Image: config.KyvernoCliImage,
2020-08-31 23:18:25 +05:30
ImagePullPolicy: "Always",
2020-09-02 14:19:11 +05:30
Args: args,
2020-08-27 13:19:41 +05:30
},
},
2020-09-09 02:08:04 -07:00
ServiceAccountName: "kyverno-service-account",
RestartPolicy: "OnFailure",
2020-08-27 13:19:41 +05:30
},
},
},
}
2020-08-31 23:18:25 +05:30
job.SetGenerateName("kyverno-policyreport-")
2020-09-11 05:11:40 -07:00
resource, err := j.dclient.CreateResource("", "Job", config.KubePolicyNamespace, job, false)
2020-09-10 05:10:29 -07:00
if err != nil {
return
}
2020-09-11 05:11:40 -07:00
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil {
2020-09-12 05:54:04 -07:00
j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName())
2020-09-11 05:11:40 -07:00
return
}
2020-09-15 06:59:05 -07:00
deadline := time.Now().Add(150 * time.Second)
2020-09-10 05:10:29 -07:00
for {
2020-09-15 06:59:05 -07:00
time.Sleep(20 * time.Second)
resource, err := j.dclient.GetResource("", "Job", config.KubePolicyNamespace, job.GetName())
if err != nil {
if apierrors.IsNotFound(err) {
j.log.Error(err, "job is already deleted", "job_name", job.GetName())
break
}
continue
2020-09-10 10:19:36 -07:00
}
2020-09-15 06:59:05 -07:00
job := v1.Job{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(resource.UnstructuredContent(), &job); err != nil {
j.log.Error(err, "Error in converting job Default Unstructured Converter", "job_name", job.GetName())
2020-09-10 05:10:29 -07:00
continue
}
2020-09-15 06:59:05 -07:00
if time.Now().After(deadline) {
if err := j.dclient.DeleteResource("", "Job", config.KubePolicyNamespace, job.GetName(), false); err != nil {
j.log.Error(err, "Error in deleting jobs", "job_name", job.GetName())
continue
}
break
}
2020-09-10 05:10:29 -07:00
}
wg.Done()
2020-08-27 13:19:41 +05:30
}