1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-29 10:55:05 +00:00

527 decoupling sender and reciever

This commit is contained in:
shravan 2020-02-29 22:39:27 +05:30
parent 053ccde6b8
commit 40e92ebacf
19 changed files with 652 additions and 562 deletions

View file

@ -140,7 +140,6 @@ func main() {
// Policy Status Handler - deals with all logic related to policy status
statusSync := policyStatus.NewSync(
pclient,
stopCh,
policyMetaStore)
// POLICY VIOLATION GENERATOR

View file

@ -0,0 +1,58 @@
package generate
import (
"encoding/json"
"reflect"
"testing"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/policyStatus"
)
type dummyStore struct {
}
func (d *dummyStore) Get(policyName string) (*v1.ClusterPolicy, error) {
return &v1.ClusterPolicy{}, nil
}
func Test_Stats(t *testing.T) {
testCase := struct {
generatedCountStats []v1.GenerateRequest
expectedOutput []byte
}{
expectedOutput: []byte(`{"policy1":{"averageExecutionTime":"","resourcesGeneratedCount":1},"policy2":{"averageExecutionTime":"","resourcesGeneratedCount":1}}`),
generatedCountStats: []v1.GenerateRequest{
{
Spec: v1.GenerateRequestSpec{
Policy: "policy1",
},
Status: v1.GenerateRequestStatus{
GeneratedResources: make([]v1.ResourceSpec, 1),
},
},
{
Spec: v1.GenerateRequestSpec{
Policy: "policy2",
},
Status: v1.GenerateRequestStatus{
GeneratedResources: make([]v1.ResourceSpec, 1),
},
},
},
}
s := policyStatus.NewSync(nil, &dummyStore{})
for _, generateCountStat := range testCase.generatedCountStats {
receiver := &generatedResourceCount{
generateRequest: generateCountStat,
}
receiver.UpdateStatus(s)
}
output, _ := json.Marshal(s.Cache.Data)
if !reflect.DeepEqual(output, testCase.expectedOutput) {
t.Errorf("\n\nTestcase has failed\nExpected:\n%v\nGot:\n%v\n\n", string(testCase.expectedOutput), string(output))
}
}

View file

@ -44,7 +44,9 @@ func (sc StatusControl) Success(gr kyverno.GenerateRequest, genResources []kyver
gr.Status.GeneratedResources = genResources
if oldState != kyverno.Completed {
go sc.policyStatus.UpdatePolicyStatusWithGeneratedResourceCount(gr)
go func() {
sc.policyStatus.Listener <- updatePolicyStatusWithGeneratedResourceCount(gr)
}()
}
_, err := sc.client.KyvernoV1().GenerateRequests("kyverno").UpdateStatus(&gr)
@ -55,3 +57,29 @@ func (sc StatusControl) Success(gr kyverno.GenerateRequest, genResources []kyver
glog.V(4).Infof("updated gr %s status to %s", gr.Name, string(kyverno.Completed))
return nil
}
type generatedResourceCount struct {
generateRequest kyverno.GenerateRequest
}
func updatePolicyStatusWithGeneratedResourceCount(generateRequest kyverno.GenerateRequest) *generatedResourceCount {
return &generatedResourceCount{
generateRequest: generateRequest,
}
}
func (vc *generatedResourceCount) UpdateStatus(s *policyStatus.Sync) {
s.Cache.Mutex.Lock()
status, exist := s.Cache.Data[vc.generateRequest.Spec.Policy]
if !exist {
policy, _ := s.PolicyStore.Get(vc.generateRequest.Spec.Policy)
if policy != nil {
status = policy.Status
}
}
status.ResourcesGeneratedCount += len(vc.generateRequest.Status.GeneratedResources)
s.Cache.Data[vc.generateRequest.Spec.Policy] = status
s.Cache.Mutex.Unlock()
}

View file

@ -1,5 +0,0 @@
package policyStatus
type statusUpdater interface {
updateStatus()
}

View file

@ -1,14 +0,0 @@
package policyStatus
import "time"
func updateAverageTime(newTime time.Duration, oldAverageTimeString string, averageOver int64) time.Duration {
if averageOver == 0 {
return newTime
}
oldAverageExecutionTime, _ := time.ParseDuration(oldAverageTimeString)
numerator := (oldAverageExecutionTime.Nanoseconds() * averageOver) + newTime.Nanoseconds()
denominator := averageOver + 1
newAverageTimeInNanoSeconds := numerator / denominator
return time.Duration(newAverageTimeInNanoSeconds) * time.Nanosecond
}

View file

@ -1,85 +0,0 @@
package policyStatus
import (
"reflect"
"sort"
"time"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine/response"
)
type generateStats struct {
s *Sync
resp response.EngineResponse
}
func (s *Sync) UpdateStatusWithGenerateStats(resp response.EngineResponse) {
s.listener <- &generateStats{
s: s,
resp: resp,
}
}
func (gs *generateStats) updateStatus() {
if reflect.DeepEqual(response.EngineResponse{}, gs.resp) {
return
}
gs.s.cache.mutex.Lock()
policyStatus, exist := gs.s.cache.data[gs.resp.PolicyResponse.Policy]
if !exist {
if gs.s.policyStore != nil {
policy, _ := gs.s.policyStore.Get(gs.resp.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
}
var nameToRule = make(map[string]v1.RuleStats)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range gs.resp.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.FailedCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
policyStatus.RulesAppliedCount++
ruleStat.AppliedCount++
} else {
policyStatus.RulesFailedCount++
ruleStat.FailedCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
sort.Slice(ruleStats, func(i, j int) bool {
return ruleStats[i].Name < ruleStats[j].Name
})
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
gs.s.cache.data[gs.resp.PolicyResponse.Policy] = policyStatus
gs.s.cache.mutex.Unlock()
}

View file

@ -1,31 +0,0 @@
package policyStatus
import v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
type generatedResourceCount struct {
sync *Sync
generateRequest v1.GenerateRequest
}
func (s *Sync) UpdatePolicyStatusWithGeneratedResourceCount(generateRequest v1.GenerateRequest) {
s.listener <- &generatedResourceCount{
sync: s,
generateRequest: generateRequest,
}
}
func (vc *generatedResourceCount) updateStatus() {
vc.sync.cache.mutex.Lock()
status, exist := vc.sync.cache.data[vc.generateRequest.Spec.Policy]
if !exist {
policy, _ := vc.sync.policyStore.Get(vc.generateRequest.Spec.Policy)
if policy != nil {
status = policy.Status
}
}
status.ResourcesGeneratedCount += len(vc.generateRequest.Status.GeneratedResources)
vc.sync.cache.data[vc.generateRequest.Spec.Policy] = status
vc.sync.cache.mutex.Unlock()
}

View file

@ -6,8 +6,6 @@ import (
"github.com/golang/glog"
"github.com/nirmata/kyverno/pkg/policystore"
"k8s.io/apimachinery/pkg/util/wait"
"github.com/nirmata/kyverno/pkg/client/clientset/versioned"
@ -15,27 +13,35 @@ import (
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
)
type statusUpdater interface {
UpdateStatus(s *Sync)
}
type policyStore interface {
Get(policyName string) (*v1.ClusterPolicy, error)
}
type Sync struct {
cache *cache
listener chan statusUpdater
Cache *cache
Listener chan statusUpdater
client *versioned.Clientset
policyStore *policystore.PolicyStore
PolicyStore policyStore
}
type cache struct {
mutex sync.RWMutex
data map[string]v1.PolicyStatus
Mutex sync.RWMutex
Data map[string]v1.PolicyStatus
}
func NewSync(c *versioned.Clientset, pms *policystore.PolicyStore) *Sync {
func NewSync(c *versioned.Clientset, p policyStore) *Sync {
return &Sync{
cache: &cache{
mutex: sync.RWMutex{},
data: make(map[string]v1.PolicyStatus),
Cache: &cache{
Mutex: sync.RWMutex{},
Data: make(map[string]v1.PolicyStatus),
},
client: c,
policyStore: pms,
listener: make(chan statusUpdater),
PolicyStore: p,
Listener: make(chan statusUpdater),
}
}
@ -52,8 +58,8 @@ func (s *Sync) Run(workers int, stopCh <-chan struct{}) {
func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
for {
select {
case statusUpdater := <-s.listener:
statusUpdater.updateStatus()
case statusUpdater := <-s.Listener:
statusUpdater.UpdateStatus(s)
case <-stopCh:
return
}
@ -61,24 +67,24 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
}
func (s *Sync) updatePolicyStatus() {
s.cache.mutex.Lock()
var nameToStatus = make(map[string]v1.PolicyStatus, len(s.cache.data))
for k, v := range s.cache.data {
s.Cache.Mutex.Lock()
var nameToStatus = make(map[string]v1.PolicyStatus, len(s.Cache.Data))
for k, v := range s.Cache.Data {
nameToStatus[k] = v
}
s.cache.mutex.Unlock()
s.Cache.Mutex.Unlock()
for policyName, status := range nameToStatus {
policy, err := s.policyStore.Get(policyName)
policy, err := s.PolicyStore.Get(policyName)
if err != nil {
continue
}
policy.Status = status
_, err = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy)
if err != nil {
s.cache.mutex.Lock()
delete(s.cache.data, policyName)
s.cache.mutex.Unlock()
s.Cache.Mutex.Lock()
delete(s.Cache.Data, policyName)
s.Cache.Mutex.Unlock()
glog.V(4).Info(err)
}
}

View file

@ -1,88 +0,0 @@
package policyStatus
import (
"reflect"
"sort"
"time"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine/response"
)
type mutateStats struct {
s *Sync
resp response.EngineResponse
}
func (s *Sync) UpdateStatusWithMutateStats(resp response.EngineResponse) {
s.listener <- &mutateStats{
s: s,
resp: resp,
}
}
func (ms *mutateStats) updateStatus() {
if reflect.DeepEqual(response.EngineResponse{}, ms.resp) {
return
}
ms.s.cache.mutex.Lock()
policyStatus, exist := ms.s.cache.data[ms.resp.PolicyResponse.Policy]
if !exist {
if ms.s.policyStore != nil {
policy, _ := ms.s.policyStore.Get(ms.resp.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
}
var nameToRule = make(map[string]v1.RuleStats)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range ms.resp.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.FailedCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
policyStatus.RulesAppliedCount++
policyStatus.ResourcesMutatedCount++
ruleStat.AppliedCount++
ruleStat.ResourcesMutatedCount++
} else {
policyStatus.RulesFailedCount++
ruleStat.FailedCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
sort.Slice(ruleStats, func(i, j int) bool {
return ruleStats[i].Name < ruleStats[j].Name
})
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
ms.s.cache.data[ms.resp.PolicyResponse.Policy] = policyStatus
ms.s.cache.mutex.Unlock()
}

View file

@ -1,89 +0,0 @@
package policyStatus
import (
"reflect"
"sort"
"time"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine/response"
)
type validateStats struct {
s *Sync
resp response.EngineResponse
}
func (s *Sync) UpdateStatusWithValidateStats(resp response.EngineResponse) {
s.listener <- &validateStats{
s: s,
resp: resp,
}
}
func (vs *validateStats) updateStatus() {
if reflect.DeepEqual(response.EngineResponse{}, vs.resp) {
return
}
vs.s.cache.mutex.Lock()
policyStatus, exist := vs.s.cache.data[vs.resp.PolicyResponse.Policy]
if !exist {
if vs.s.policyStore != nil {
policy, _ := vs.s.policyStore.Get(vs.resp.PolicyResponse.Policy)
if policy != nil {
policyStatus = policy.Status
}
}
}
var nameToRule = make(map[string]v1.RuleStats)
for _, rule := range policyStatus.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range vs.resp.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.FailedCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
policyStatus.RulesAppliedCount++
ruleStat.AppliedCount++
} else {
policyStatus.RulesFailedCount++
ruleStat.FailedCount++
if vs.resp.PolicyResponse.ValidationFailureAction == "enforce" {
policyStatus.ResourcesBlockedCount++
ruleStat.ResourcesBlockedCount++
}
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
sort.Slice(ruleStats, func(i, j int) bool {
return ruleStats[i].Name < ruleStats[j].Name
})
policyStatus.AvgExecutionTime = policyAverageExecutionTime.String()
policyStatus.Rules = ruleStats
vs.s.cache.data[vs.resp.PolicyResponse.Policy] = policyStatus
vs.s.cache.mutex.Unlock()
}

View file

@ -1,41 +0,0 @@
package policyStatus
import v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
type violationCount struct {
sync *Sync
policyName string
violatedRules []v1.ViolatedRule
}
func (s *Sync) UpdatePolicyStatusWithViolationCount(policyName string, violatedRules []v1.ViolatedRule) {
s.listener <- &violationCount{
sync: s,
policyName: policyName,
violatedRules: violatedRules,
}
}
func (vc *violationCount) updateStatus() {
vc.sync.cache.mutex.Lock()
status, exist := vc.sync.cache.data[vc.policyName]
if !exist {
policy, _ := vc.sync.policyStore.Get(vc.policyName)
if policy != nil {
status = policy.Status
}
}
var ruleNameToViolations = make(map[string]int)
for _, rule := range vc.violatedRules {
ruleNameToViolations[rule.Name]++
}
for i := range status.Rules {
status.ViolationCount += ruleNameToViolations[status.Rules[i].Name]
status.Rules[i].ViolationCount += ruleNameToViolations[status.Rules[i].Name]
}
vc.sync.cache.data[vc.policyName] = status
vc.sync.cache.mutex.Unlock()
}

View file

@ -100,7 +100,9 @@ func (cpv *clusterPV) createPV(newPv *kyverno.ClusterPolicyViolation) error {
}
if newPv.Annotations["fromSync"] != "true" {
go cpv.policyStatus.UpdatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
go func() {
cpv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
}()
}
glog.Infof("policy violation created for resource %v", newPv.Spec.ResourceSpec)
@ -126,7 +128,9 @@ func (cpv *clusterPV) updatePV(newPv, oldPv *kyverno.ClusterPolicyViolation) err
glog.Infof("cluster policy violation updated for resource %v", newPv.Spec.ResourceSpec)
if newPv.Annotations["fromSync"] != "true" {
go cpv.policyStatus.UpdatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
go func() {
cpv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
}()
}
return nil
}

View file

@ -4,9 +4,12 @@ import (
"fmt"
"time"
"github.com/nirmata/kyverno/pkg/policyStatus"
backoff "github.com/cenkalti/backoff"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
client "github.com/nirmata/kyverno/pkg/dclient"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -70,3 +73,39 @@ func converLabelToSelector(labelMap map[string]string) (labels.Selector, error)
return policyViolationSelector, nil
}
type violationCount struct {
policyName string
violatedRules []v1.ViolatedRule
}
func updatePolicyStatusWithViolationCount(policyName string, violatedRules []kyverno.ViolatedRule) *violationCount {
return &violationCount{
policyName: policyName,
violatedRules: violatedRules,
}
}
func (vc *violationCount) UpdateStatus(s *policyStatus.Sync) {
s.Cache.Mutex.Lock()
status, exist := s.Cache.Data[vc.policyName]
if !exist {
policy, _ := s.PolicyStore.Get(vc.policyName)
if policy != nil {
status = policy.Status
}
}
var ruleNameToViolations = make(map[string]int)
for _, rule := range vc.violatedRules {
ruleNameToViolations[rule.Name]++
}
for i := range status.Rules {
status.ViolationCount += ruleNameToViolations[status.Rules[i].Name]
status.Rules[i].ViolationCount += ruleNameToViolations[status.Rules[i].Name]
}
s.Cache.Data[vc.policyName] = status
s.Cache.Mutex.Unlock()
}

View file

@ -99,7 +99,9 @@ func (nspv *namespacedPV) createPV(newPv *kyverno.PolicyViolation) error {
}
if newPv.Annotations["fromSync"] != "true" {
go nspv.policyStatus.UpdatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
go func() {
nspv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
}()
}
glog.Infof("policy violation created for resource %v", newPv.Spec.ResourceSpec)
return nil
@ -122,7 +124,9 @@ func (nspv *namespacedPV) updatePV(newPv, oldPv *kyverno.PolicyViolation) error
}
if newPv.Annotations["fromSync"] != "true" {
go nspv.policyStatus.UpdatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
go func() {
nspv.policyStatus.Listener <- updatePolicyStatusWithViolationCount(newPv.Spec.Policy, newPv.Spec.ViolatedRules)
}()
}
glog.Infof("namespaced policy violation updated for resource %v", newPv.Spec.ResourceSpec)
return nil

View file

@ -0,0 +1,74 @@
package policyviolation
import (
"encoding/json"
"reflect"
"testing"
"github.com/nirmata/kyverno/pkg/policyStatus"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
)
type dummyStore struct {
}
func (d *dummyStore) Get(policyName string) (*v1.ClusterPolicy, error) {
return &v1.ClusterPolicy{
Status: v1.PolicyStatus{
Rules: []v1.RuleStats{
{
Name: "rule4",
},
},
},
}, nil
}
func Test_Stats(t *testing.T) {
testCase := struct {
violationCountStats []struct {
policyName string
violatedRules []v1.ViolatedRule
}
expectedOutput []byte
}{
expectedOutput: []byte(`{"policy1":{"averageExecutionTime":"","violationCount":1,"ruleStatus":[{"ruleName":"rule4","violationCount":1}]},"policy2":{"averageExecutionTime":"","violationCount":1,"ruleStatus":[{"ruleName":"rule4","violationCount":1}]}}`),
violationCountStats: []struct {
policyName string
violatedRules []v1.ViolatedRule
}{
{
policyName: "policy1",
violatedRules: []v1.ViolatedRule{
{
Name: "rule4",
},
},
},
{
policyName: "policy2",
violatedRules: []v1.ViolatedRule{
{
Name: "rule4",
},
},
},
},
}
s := policyStatus.NewSync(nil, &dummyStore{})
for _, violationCountStat := range testCase.violationCountStats {
receiver := &violationCount{
policyName: violationCountStat.policyName,
violatedRules: violationCountStat.violatedRules,
}
receiver.UpdateStatus(s)
}
output, _ := json.Marshal(s.Cache.Data)
if !reflect.DeepEqual(output, testCase.expectedOutput) {
t.Errorf("\n\nTestcase has failed\nExpected:\n%v\nGot:\n%v\n\n", string(testCase.expectedOutput), string(output))
}
}

View file

@ -1,8 +1,15 @@
package webhooks
import (
"reflect"
"sort"
"time"
"github.com/nirmata/kyverno/pkg/policyStatus"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/engine/context"
"github.com/nirmata/kyverno/pkg/engine/response"
@ -61,7 +68,9 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
if len(engineResponse.PolicyResponse.Rules) > 0 {
// some generate rules do apply to the resource
engineResponses = append(engineResponses, engineResponse)
go ws.status.UpdateStatusWithGenerateStats(engineResponse)
go func() {
ws.status.Listener <- updateStatusWithGenerateStats(engineResponse)
}()
}
}
// Adds Generate Request to a channel(queue size 1000) to generators
@ -103,3 +112,87 @@ func transform(userRequestInfo kyverno.RequestInfo, er response.EngineResponse)
}
return gr
}
type generateStats struct {
resp response.EngineResponse
}
func updateStatusWithGenerateStats(resp response.EngineResponse) *generateStats {
return &generateStats{
resp: resp,
}
}
func (gs *generateStats) UpdateStatus(s *policyStatus.Sync) {
if reflect.DeepEqual(response.EngineResponse{}, gs.resp) {
return
}
s.Cache.Mutex.Lock()
status, exist := s.Cache.Data[gs.resp.PolicyResponse.Policy]
if !exist {
if s.PolicyStore != nil {
policy, _ := s.PolicyStore.Get(gs.resp.PolicyResponse.Policy)
if policy != nil {
status = policy.Status
}
}
}
var nameToRule = make(map[string]v1.RuleStats)
for _, rule := range status.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range gs.resp.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.FailedCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
status.RulesAppliedCount++
ruleStat.AppliedCount++
} else {
status.RulesFailedCount++
ruleStat.FailedCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
sort.Slice(ruleStats, func(i, j int) bool {
return ruleStats[i].Name < ruleStats[j].Name
})
status.AvgExecutionTime = policyAverageExecutionTime.String()
status.Rules = ruleStats
s.Cache.Data[gs.resp.PolicyResponse.Policy] = status
s.Cache.Mutex.Unlock()
}
func updateAverageTime(newTime time.Duration, oldAverageTimeString string, averageOver int64) time.Duration {
if averageOver == 0 {
return newTime
}
oldAverageExecutionTime, _ := time.ParseDuration(oldAverageTimeString)
numerator := (oldAverageExecutionTime.Nanoseconds() * averageOver) + newTime.Nanoseconds()
denominator := averageOver + 1
newAverageTimeInNanoSeconds := numerator / denominator
return time.Duration(newAverageTimeInNanoSeconds) * time.Nanosecond
}

View file

@ -1,10 +1,15 @@
package webhooks
import (
"reflect"
"sort"
"time"
"github.com/nirmata/kyverno/pkg/policyStatus"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/engine/context"
"github.com/nirmata/kyverno/pkg/engine/response"
@ -58,7 +63,9 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
policyContext.Policy = policy
engineResponse := engine.Mutate(policyContext)
engineResponses = append(engineResponses, engineResponse)
go ws.status.UpdateStatusWithMutateStats(engineResponse)
go func() {
ws.status.Listener <- updateStatusWithMutateStats(engineResponse)
}()
if !engineResponse.IsSuccesful() {
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName())
continue
@ -106,3 +113,79 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
// patches holds all the successful patches, if no patch is created, it returns nil
return engineutils.JoinPatches(patches)
}
type mutateStats struct {
resp response.EngineResponse
}
func updateStatusWithMutateStats(resp response.EngineResponse) *mutateStats {
return &mutateStats{
resp: resp,
}
}
func (ms *mutateStats) UpdateStatus(s *policyStatus.Sync) {
if reflect.DeepEqual(response.EngineResponse{}, ms.resp) {
return
}
s.Cache.Mutex.Lock()
status, exist := s.Cache.Data[ms.resp.PolicyResponse.Policy]
if !exist {
if s.PolicyStore != nil {
policy, _ := s.PolicyStore.Get(ms.resp.PolicyResponse.Policy)
if policy != nil {
status = policy.Status
}
}
}
var nameToRule = make(map[string]v1.RuleStats)
for _, rule := range status.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range ms.resp.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.FailedCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
status.RulesAppliedCount++
status.ResourcesMutatedCount++
ruleStat.AppliedCount++
ruleStat.ResourcesMutatedCount++
} else {
status.RulesFailedCount++
ruleStat.FailedCount++
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
sort.Slice(ruleStats, func(i, j int) bool {
return ruleStats[i].Name < ruleStats[j].Name
})
status.AvgExecutionTime = policyAverageExecutionTime.String()
status.Rules = ruleStats
s.Cache.Data[ms.resp.PolicyResponse.Policy] = status
s.Cache.Mutex.Unlock()
}

View file

@ -1,4 +1,4 @@
package policyStatus
package webhooks
import (
"encoding/json"
@ -7,151 +7,23 @@ import (
"time"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine/response"
"github.com/nirmata/kyverno/pkg/policyStatus"
)
func Test_Stats(t *testing.T) {
type dummyStore struct {
}
func (d *dummyStore) Get(policyName string) (*v1.ClusterPolicy, error) {
return &v1.ClusterPolicy{}, nil
}
func Test_GenerateStats(t *testing.T) {
testCase := struct {
mutateStats []response.EngineResponse
validateStats []response.EngineResponse
generateStats []response.EngineResponse
violationCountStats []struct {
policyName string
violatedRules []v1.ViolatedRule
}
generatedCountStats []v1.GenerateRequest
expectedOutput []byte
generateStats []response.EngineResponse
expectedOutput []byte
}{
expectedOutput: []byte(`{"policy1":{"averageExecutionTime":"1.482µs","violationCount":1,"rulesFailedCount":3,"rulesAppliedCount":3,"resourcesBlockedCount":1,"resourcesMutatedCount":1,"resourcesGeneratedCount":1,"ruleStatus":[{"ruleName":"rule1","averageExecutionTime":"243ns","appliedCount":1,"resourcesMutatedCount":1},{"ruleName":"rule2","averageExecutionTime":"251ns","failedCount":1},{"ruleName":"rule3","averageExecutionTime":"243ns","appliedCount":1},{"ruleName":"rule4","averageExecutionTime":"251ns","violationCount":1,"failedCount":1,"resourcesBlockedCount":1},{"ruleName":"rule5","averageExecutionTime":"243ns","appliedCount":1},{"ruleName":"rule6","averageExecutionTime":"251ns","failedCount":1}]},"policy2":{"averageExecutionTime":"1.299µs","violationCount":1,"rulesFailedCount":3,"rulesAppliedCount":3,"resourcesMutatedCount":1,"resourcesGeneratedCount":1,"ruleStatus":[{"ruleName":"rule1","averageExecutionTime":"222ns","appliedCount":1,"resourcesMutatedCount":1},{"ruleName":"rule2","averageExecutionTime":"211ns","failedCount":1},{"ruleName":"rule3","averageExecutionTime":"222ns","appliedCount":1},{"ruleName":"rule4","averageExecutionTime":"211ns","violationCount":1,"failedCount":1},{"ruleName":"rule5","averageExecutionTime":"222ns","appliedCount":1},{"ruleName":"rule6","averageExecutionTime":"211ns","failedCount":1}]}}`),
generatedCountStats: []v1.GenerateRequest{
{
Spec: v1.GenerateRequestSpec{
Policy: "policy1",
},
Status: v1.GenerateRequestStatus{
GeneratedResources: make([]v1.ResourceSpec, 1),
},
},
{
Spec: v1.GenerateRequestSpec{
Policy: "policy2",
},
Status: v1.GenerateRequestStatus{
GeneratedResources: make([]v1.ResourceSpec, 1),
},
},
},
violationCountStats: []struct {
policyName string
violatedRules []v1.ViolatedRule
}{
{
policyName: "policy1",
violatedRules: []v1.ViolatedRule{
{
Name: "rule4",
},
},
},
{
policyName: "policy2",
violatedRules: []v1.ViolatedRule{
{
Name: "rule4",
},
},
},
},
mutateStats: []response.EngineResponse{
{
PolicyResponse: response.PolicyResponse{
Policy: "policy1",
Rules: []response.RuleResponse{
{
Name: "rule1",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 243,
},
},
{
Name: "rule2",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 251,
},
},
},
},
},
{
PolicyResponse: response.PolicyResponse{
Policy: "policy2",
Rules: []response.RuleResponse{
{
Name: "rule1",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 222,
},
},
{
Name: "rule2",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 211,
},
},
},
},
},
},
validateStats: []response.EngineResponse{
{
PolicyResponse: response.PolicyResponse{
Policy: "policy1",
ValidationFailureAction: "enforce",
Rules: []response.RuleResponse{
{
Name: "rule3",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 243,
},
},
{
Name: "rule4",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 251,
},
},
},
},
},
{
PolicyResponse: response.PolicyResponse{
Policy: "policy2",
Rules: []response.RuleResponse{
{
Name: "rule3",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 222,
},
},
{
Name: "rule4",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 211,
},
},
},
},
},
},
expectedOutput: []byte(`{"policy1":{"averageExecutionTime":"494ns","rulesFailedCount":1,"rulesAppliedCount":1,"ruleStatus":[{"ruleName":"rule5","averageExecutionTime":"243ns","appliedCount":1},{"ruleName":"rule6","averageExecutionTime":"251ns","failedCount":1}]},"policy2":{"averageExecutionTime":"433ns","rulesFailedCount":1,"rulesAppliedCount":1,"ruleStatus":[{"ruleName":"rule5","averageExecutionTime":"222ns","appliedCount":1},{"ruleName":"rule6","averageExecutionTime":"211ns","failedCount":1}]}}`),
generateStats: []response.EngineResponse{
{
PolicyResponse: response.PolicyResponse{
@ -198,49 +70,149 @@ func Test_Stats(t *testing.T) {
},
}
s := NewSync(nil, nil, nil)
for _, mutateStat := range testCase.mutateStats {
receiver := &mutateStats{
s: s,
resp: mutateStat,
}
receiver.updateStatus()
}
for _, validateStat := range testCase.validateStats {
receiver := &validateStats{
s: s,
resp: validateStat,
}
receiver.updateStatus()
}
s := policyStatus.NewSync(nil, &dummyStore{})
for _, generateStat := range testCase.generateStats {
receiver := &generateStats{
s: s,
resp: generateStat,
}
receiver.updateStatus()
receiver.UpdateStatus(s)
}
for _, generateCountStat := range testCase.generatedCountStats {
receiver := &generatedResourceCount{
sync: s,
generateRequest: generateCountStat,
}
receiver.updateStatus()
}
for _, violationCountStat := range testCase.violationCountStats {
receiver := &violationCount{
sync: s,
policyName: violationCountStat.policyName,
violatedRules: violationCountStat.violatedRules,
}
receiver.updateStatus()
}
output, _ := json.Marshal(s.cache.data)
output, _ := json.Marshal(s.Cache.Data)
if !reflect.DeepEqual(output, testCase.expectedOutput) {
t.Errorf("\n\nTestcase has failed\nExpected:\n%v\nGot:\n%v\n\n", string(testCase.expectedOutput), string(output))
}
}
func Test_MutateStats(t *testing.T) {
testCase := struct {
mutateStats []response.EngineResponse
expectedOutput []byte
}{
expectedOutput: []byte(`{"policy1":{"averageExecutionTime":"494ns","rulesFailedCount":1,"rulesAppliedCount":1,"resourcesMutatedCount":1,"ruleStatus":[{"ruleName":"rule1","averageExecutionTime":"243ns","appliedCount":1,"resourcesMutatedCount":1},{"ruleName":"rule2","averageExecutionTime":"251ns","failedCount":1}]},"policy2":{"averageExecutionTime":"433ns","rulesFailedCount":1,"rulesAppliedCount":1,"resourcesMutatedCount":1,"ruleStatus":[{"ruleName":"rule1","averageExecutionTime":"222ns","appliedCount":1,"resourcesMutatedCount":1},{"ruleName":"rule2","averageExecutionTime":"211ns","failedCount":1}]}}`),
mutateStats: []response.EngineResponse{
{
PolicyResponse: response.PolicyResponse{
Policy: "policy1",
Rules: []response.RuleResponse{
{
Name: "rule1",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 243,
},
},
{
Name: "rule2",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 251,
},
},
},
},
},
{
PolicyResponse: response.PolicyResponse{
Policy: "policy2",
Rules: []response.RuleResponse{
{
Name: "rule1",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 222,
},
},
{
Name: "rule2",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 211,
},
},
},
},
},
},
}
s := policyStatus.NewSync(nil, &dummyStore{})
for _, mutateStat := range testCase.mutateStats {
receiver := &mutateStats{
resp: mutateStat,
}
receiver.UpdateStatus(s)
}
output, _ := json.Marshal(s.Cache.Data)
if !reflect.DeepEqual(output, testCase.expectedOutput) {
t.Errorf("\n\nTestcase has failed\nExpected:\n%v\nGot:\n%v\n\n", string(testCase.expectedOutput), string(output))
}
}
func Test_ValidateStats(t *testing.T) {
testCase := struct {
validateStats []response.EngineResponse
expectedOutput []byte
}{
expectedOutput: []byte(`{"policy1":{"averageExecutionTime":"494ns","rulesFailedCount":1,"rulesAppliedCount":1,"resourcesBlockedCount":1,"ruleStatus":[{"ruleName":"rule3","averageExecutionTime":"243ns","appliedCount":1},{"ruleName":"rule4","averageExecutionTime":"251ns","failedCount":1,"resourcesBlockedCount":1}]},"policy2":{"averageExecutionTime":"433ns","rulesFailedCount":1,"rulesAppliedCount":1,"ruleStatus":[{"ruleName":"rule3","averageExecutionTime":"222ns","appliedCount":1},{"ruleName":"rule4","averageExecutionTime":"211ns","failedCount":1}]}}`),
validateStats: []response.EngineResponse{
{
PolicyResponse: response.PolicyResponse{
Policy: "policy1",
ValidationFailureAction: "enforce",
Rules: []response.RuleResponse{
{
Name: "rule3",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 243,
},
},
{
Name: "rule4",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 251,
},
},
},
},
},
{
PolicyResponse: response.PolicyResponse{
Policy: "policy2",
Rules: []response.RuleResponse{
{
Name: "rule3",
Success: true,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 222,
},
},
{
Name: "rule4",
Success: false,
RuleStats: response.RuleStats{
ProcessingTime: time.Nanosecond * 211,
},
},
},
},
},
},
}
s := policyStatus.NewSync(nil, &dummyStore{})
for _, validateStat := range testCase.validateStats {
receiver := &validateStats{
resp: validateStat,
}
receiver.UpdateStatus(s)
}
output, _ := json.Marshal(s.Cache.Data)
if !reflect.DeepEqual(output, testCase.expectedOutput) {
t.Errorf("\n\nTestcase has failed\nExpected:\n%v\nGot:\n%v\n\n", string(testCase.expectedOutput), string(output))
}

View file

@ -2,10 +2,14 @@ package webhooks
import (
"reflect"
"sort"
"time"
"github.com/nirmata/kyverno/pkg/policyStatus"
"github.com/golang/glog"
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
"github.com/nirmata/kyverno/pkg/engine"
"github.com/nirmata/kyverno/pkg/engine/context"
"github.com/nirmata/kyverno/pkg/engine/response"
@ -69,7 +73,9 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
continue
}
engineResponses = append(engineResponses, engineResponse)
go ws.status.UpdateStatusWithValidateStats(engineResponse)
go func() {
ws.status.Listener <- updateStatusWithValidateStats(engineResponse)
}()
if !engineResponse.IsSuccesful() {
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, newR.GetNamespace(), newR.GetName())
continue
@ -99,3 +105,80 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
glog.V(4).Infof("report: %v %s/%s/%s", time.Since(reportTime), request.Kind, request.Namespace, request.Name)
return true, ""
}
type validateStats struct {
resp response.EngineResponse
}
func updateStatusWithValidateStats(resp response.EngineResponse) *validateStats {
return &validateStats{
resp: resp,
}
}
func (vs *validateStats) UpdateStatus(s *policyStatus.Sync) {
if reflect.DeepEqual(response.EngineResponse{}, vs.resp) {
return
}
s.Cache.Mutex.Lock()
status, exist := s.Cache.Data[vs.resp.PolicyResponse.Policy]
if !exist {
if s.PolicyStore != nil {
policy, _ := s.PolicyStore.Get(vs.resp.PolicyResponse.Policy)
if policy != nil {
status = policy.Status
}
}
}
var nameToRule = make(map[string]v1.RuleStats)
for _, rule := range status.Rules {
nameToRule[rule.Name] = rule
}
for _, rule := range vs.resp.PolicyResponse.Rules {
ruleStat := nameToRule[rule.Name]
ruleStat.Name = rule.Name
averageOver := int64(ruleStat.AppliedCount + ruleStat.FailedCount)
ruleStat.ExecutionTime = updateAverageTime(
rule.ProcessingTime,
ruleStat.ExecutionTime,
averageOver).String()
if rule.Success {
status.RulesAppliedCount++
ruleStat.AppliedCount++
} else {
status.RulesFailedCount++
ruleStat.FailedCount++
if vs.resp.PolicyResponse.ValidationFailureAction == "enforce" {
status.ResourcesBlockedCount++
ruleStat.ResourcesBlockedCount++
}
}
nameToRule[rule.Name] = ruleStat
}
var policyAverageExecutionTime time.Duration
var ruleStats = make([]v1.RuleStats, 0, len(nameToRule))
for _, ruleStat := range nameToRule {
executionTime, err := time.ParseDuration(ruleStat.ExecutionTime)
if err == nil {
policyAverageExecutionTime += executionTime
}
ruleStats = append(ruleStats, ruleStat)
}
sort.Slice(ruleStats, func(i, j int) bool {
return ruleStats[i].Name < ruleStats[j].Name
})
status.AvgExecutionTime = policyAverageExecutionTime.String()
status.Rules = ruleStats
s.Cache.Data[vs.resp.PolicyResponse.Policy] = status
s.Cache.Mutex.Unlock()
}