mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-31 03:45:17 +00:00
introduce locking for policy status updates
This commit is contained in:
parent
e507fb6422
commit
bcad9ada2d
2 changed files with 64 additions and 7 deletions
|
@ -119,7 +119,7 @@ func NewPolicyController(kyvernoClient *kyvernoclient.Clientset, client *client.
|
||||||
pc.rm = NewResourceManager(30)
|
pc.rm = NewResourceManager(30)
|
||||||
|
|
||||||
// aggregator
|
// aggregator
|
||||||
pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient)
|
pc.statusAggregator = NewPolicyStatAggregator(kyvernoClient, pInformer.Lister())
|
||||||
|
|
||||||
return &pc, nil
|
return &pc, nil
|
||||||
}
|
}
|
||||||
|
@ -411,7 +411,8 @@ func (pc *PolicyController) syncPolicy(key string) error {
|
||||||
policyInfos := pc.processExistingResources(*p)
|
policyInfos := pc.processExistingResources(*p)
|
||||||
// report errors
|
// report errors
|
||||||
pc.report(policyInfos)
|
pc.report(policyInfos)
|
||||||
return pc.statusAggregator.UpdateViolationCount(p, pvList)
|
// fetch the policy again via the aggreagator to remain consistent
|
||||||
|
return pc.statusAggregator.UpdateViolationCount(p.Name, pvList)
|
||||||
// return pc.syncStatusOnly(p, pvList)
|
// return pc.syncStatusOnly(p, pvList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,13 @@ package policy
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"reflect"
|
"reflect"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1"
|
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1alpha1"
|
||||||
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
|
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
|
||||||
|
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1alpha1"
|
||||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
@ -29,14 +31,20 @@ type PolicyStatusAggregator struct {
|
||||||
ch chan PolicyStat
|
ch chan PolicyStat
|
||||||
// update polict status
|
// update polict status
|
||||||
psControl PStatusControlInterface
|
psControl PStatusControlInterface
|
||||||
|
// pLister can list/get policy from the shared informer's store
|
||||||
|
pLister kyvernolister.PolicyLister
|
||||||
|
// UpdateViolationCount and SendStat can update same policy status
|
||||||
|
// we need to sync the updates using policyName
|
||||||
|
policyUpdateData sync.Map
|
||||||
}
|
}
|
||||||
|
|
||||||
//NewPolicyStatAggregator returns a new policy status
|
//NewPolicyStatAggregator returns a new policy status
|
||||||
func NewPolicyStatAggregator(client *kyvernoclient.Clientset) *PolicyStatusAggregator {
|
func NewPolicyStatAggregator(client *kyvernoclient.Clientset, pLister kyvernolister.PolicyLister) *PolicyStatusAggregator {
|
||||||
psa := PolicyStatusAggregator{
|
psa := PolicyStatusAggregator{
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
ch: make(chan PolicyStat),
|
ch: make(chan PolicyStat),
|
||||||
}
|
}
|
||||||
|
psa.pLister = pLister
|
||||||
psa.psControl = PSControl{Client: client}
|
psa.psControl = PSControl{Client: client}
|
||||||
//TODO: add WaitGroup
|
//TODO: add WaitGroup
|
||||||
return &psa
|
return &psa
|
||||||
|
@ -53,14 +61,46 @@ func (psa *PolicyStatusAggregator) Run(workers int, stopCh <-chan struct{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
func (psa *PolicyStatusAggregator) aggregate() {
|
func (psa *PolicyStatusAggregator) aggregate() {
|
||||||
|
// As mutation and validation are handled seperately
|
||||||
|
// ideally we need to combine the exection time from both for a policy
|
||||||
|
// but its tricky to detect here the type of rules policy contains
|
||||||
|
// so we dont combine the results, but instead compute the execution time for
|
||||||
|
// mutation & validation rules seperately
|
||||||
for r := range psa.ch {
|
for r := range psa.ch {
|
||||||
glog.V(4).Infof("recieved policy stats %v", r)
|
glog.V(4).Infof("recieved policy stats %v", r)
|
||||||
|
if err := psa.updateStats(r); err != nil {
|
||||||
|
glog.Info("Failed to update stats for policy %s: %v", r.PolicyName, err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (psa *PolicyStatusAggregator) updateStats(stats PolicyStat) error {
|
||||||
|
func() {
|
||||||
|
glog.V(4).Infof("lock updates for policy name %s", stats.PolicyName)
|
||||||
|
// Lock the update for policy
|
||||||
|
psa.policyUpdateData.Store(stats.PolicyName, struct{}{})
|
||||||
|
}()
|
||||||
|
defer func() {
|
||||||
|
glog.V(4).Infof("Unlock updates for policy name %s", stats.PolicyName)
|
||||||
|
psa.policyUpdateData.Delete(stats.PolicyName)
|
||||||
|
}()
|
||||||
|
// get policy
|
||||||
|
policy, err := psa.pLister.Get(stats.PolicyName)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(4).Infof("failed to get policy %s. Unable to update violation count: %v", stats.PolicyName, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
glog.V(4).Infof("updating stats for policy %s", policy.Name)
|
||||||
|
// update the statistics
|
||||||
|
// policy.Status
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type PolicyStatusInterface interface {
|
type PolicyStatusInterface interface {
|
||||||
SendStat(stat PolicyStat)
|
SendStat(stat PolicyStat)
|
||||||
UpdateViolationCount(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error
|
UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error
|
||||||
}
|
}
|
||||||
|
|
||||||
type PolicyStat struct {
|
type PolicyStat struct {
|
||||||
|
@ -79,14 +119,30 @@ func (psa *PolicyStatusAggregator) SendStat(stat PolicyStat) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//UpdateViolationCount updates the active violation count
|
//UpdateViolationCount updates the active violation count
|
||||||
func (psa *PolicyStatusAggregator) UpdateViolationCount(p *kyverno.Policy, pvList []*kyverno.PolicyViolation) error {
|
func (psa *PolicyStatusAggregator) UpdateViolationCount(policyName string, pvList []*kyverno.PolicyViolation) error {
|
||||||
|
func() {
|
||||||
|
glog.V(4).Infof("lock updates for policy name %s", policyName)
|
||||||
|
// Lock the update for policy
|
||||||
|
psa.policyUpdateData.Store(policyName, struct{}{})
|
||||||
|
}()
|
||||||
|
defer func() {
|
||||||
|
glog.V(4).Infof("Unlock updates for policy name %s", policyName)
|
||||||
|
psa.policyUpdateData.Delete(policyName)
|
||||||
|
}()
|
||||||
|
// get policy
|
||||||
|
policy, err := psa.pLister.Get(policyName)
|
||||||
|
if err != nil {
|
||||||
|
glog.V(4).Infof("failed to get policy %s. Unable to update violation count: %v", policyName, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
newStatus := calculateStatus(pvList)
|
newStatus := calculateStatus(pvList)
|
||||||
if reflect.DeepEqual(newStatus, p.Status) {
|
if reflect.DeepEqual(newStatus, policy.Status) {
|
||||||
// no update to status
|
// no update to status
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// update status
|
// update status
|
||||||
newPolicy := p
|
newPolicy := policy
|
||||||
newPolicy.Status = newStatus
|
newPolicy.Status = newStatus
|
||||||
|
|
||||||
return psa.psControl.UpdatePolicyStatus(newPolicy)
|
return psa.psControl.UpdatePolicyStatus(newPolicy)
|
||||||
|
|
Loading…
Add table
Reference in a new issue