mirror of
https://github.com/kyverno/kyverno.git
synced 2025-03-29 02:45:06 +00:00
527 save commit
This commit is contained in:
parent
1bbe84bbc9
commit
b5e5f3eeda
4 changed files with 121 additions and 46 deletions
|
@ -215,7 +215,7 @@ func main() {
|
|||
kubeInformer.Rbac().V1().ClusterRoleBindings(),
|
||||
egen,
|
||||
webhookRegistrationClient,
|
||||
pc.GetPolicyStatusAggregator(),
|
||||
policy.NewStatusSync(pclient, stopCh),
|
||||
configData,
|
||||
policyMetaStore,
|
||||
pvgen,
|
||||
|
|
107
pkg/policy/status2.go
Normal file
107
pkg/policy/status2.go
Normal file
|
@ -0,0 +1,107 @@
|
|||
package policy
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
|
||||
"github.com/nirmata/kyverno/pkg/client/clientset/versioned"
|
||||
|
||||
v1 "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
|
||||
)
|
||||
|
||||
type statusCache struct {
|
||||
mu sync.RWMutex
|
||||
data map[string]v1.PolicyStatus
|
||||
}
|
||||
|
||||
func (c *statusCache) Get(key string) v1.PolicyStatus {
|
||||
c.mu.RLock()
|
||||
status := c.data[key]
|
||||
c.mu.RUnlock()
|
||||
return status
|
||||
|
||||
}
|
||||
|
||||
func (c *statusCache) GetAll() map[string]v1.PolicyStatus {
|
||||
c.mu.RLock()
|
||||
mapCopy := make(map[string]v1.PolicyStatus, len(c.data))
|
||||
for k, v := range c.data {
|
||||
mapCopy[k] = v
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
return mapCopy
|
||||
|
||||
}
|
||||
func (c *statusCache) Set(key string, status v1.PolicyStatus) {
|
||||
c.mu.Lock()
|
||||
c.data[key] = status
|
||||
c.mu.Unlock()
|
||||
}
|
||||
func (c *statusCache) Clear() {
|
||||
c.mu.Lock()
|
||||
c.data = make(map[string]v1.PolicyStatus)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func newStatusCache() *statusCache {
|
||||
return &statusCache{
|
||||
mu: sync.RWMutex{},
|
||||
data: make(map[string]v1.PolicyStatus),
|
||||
}
|
||||
}
|
||||
|
||||
func NewStatusSync(client *versioned.Clientset, stopCh <-chan struct{}) *StatusSync {
|
||||
return &StatusSync{
|
||||
statusReceiver: make(chan map[string]v1.PolicyStatus),
|
||||
cache: newStatusCache(),
|
||||
stop: stopCh,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
type StatusSync struct {
|
||||
statusReceiver chan map[string]v1.PolicyStatus
|
||||
cache *statusCache
|
||||
stop <-chan struct{}
|
||||
client *versioned.Clientset
|
||||
}
|
||||
|
||||
func (s *StatusSync) Cache() *statusCache {
|
||||
return s.cache
|
||||
}
|
||||
|
||||
func (s *StatusSync) Receiver() chan<- map[string]v1.PolicyStatus {
|
||||
return s.statusReceiver
|
||||
}
|
||||
|
||||
func (s *StatusSync) Start() {
|
||||
// receive status and store it in cache
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case nameToStatus := <-s.statusReceiver:
|
||||
for policyName, status := range nameToStatus {
|
||||
s.cache.Set(policyName, status)
|
||||
}
|
||||
case <-s.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// update policy status every 10 seconds - waits for previous updateStatus to complete
|
||||
wait.Until(s.updateStatus, 10*time.Second, s.stop)
|
||||
<-s.stop
|
||||
}
|
||||
|
||||
func (s *StatusSync) updateStatus() {
|
||||
for policyName, status := range s.cache.GetAll() {
|
||||
var policy = &v1.ClusterPolicy{}
|
||||
policy.Name = policyName
|
||||
policy.Status = status
|
||||
_, _ = s.client.KyvernoV1().ClusterPolicies().UpdateStatus(policy)
|
||||
}
|
||||
s.cache.Clear()
|
||||
}
|
|
@ -3,15 +3,15 @@ package webhooks
|
|||
import (
|
||||
"time"
|
||||
|
||||
"github.com/nirmata/kyverno/pkg/policy"
|
||||
|
||||
"github.com/golang/glog"
|
||||
kyverno "github.com/nirmata/kyverno/pkg/api/kyverno/v1"
|
||||
"github.com/nirmata/kyverno/pkg/engine"
|
||||
"github.com/nirmata/kyverno/pkg/engine/context"
|
||||
"github.com/nirmata/kyverno/pkg/engine/response"
|
||||
engineutils "github.com/nirmata/kyverno/pkg/engine/utils"
|
||||
policyctr "github.com/nirmata/kyverno/pkg/policy"
|
||||
"github.com/nirmata/kyverno/pkg/policyviolation"
|
||||
"github.com/nirmata/kyverno/pkg/utils"
|
||||
v1beta1 "k8s.io/api/admission/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
)
|
||||
|
@ -23,40 +23,6 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
|
|||
request.Kind.Kind, request.Namespace, request.Name, request.UID, request.Operation)
|
||||
|
||||
var patches [][]byte
|
||||
var policyStats []policyctr.PolicyStat
|
||||
|
||||
// gather stats from the engine response
|
||||
gatherStat := func(policyName string, policyResponse response.PolicyResponse) {
|
||||
ps := policyctr.PolicyStat{}
|
||||
ps.PolicyName = policyName
|
||||
ps.Stats.MutationExecutionTime = policyResponse.ProcessingTime
|
||||
ps.Stats.RulesAppliedCount = policyResponse.RulesAppliedCount
|
||||
// capture rule level stats
|
||||
for _, rule := range policyResponse.Rules {
|
||||
rs := policyctr.RuleStatinfo{}
|
||||
rs.RuleName = rule.Name
|
||||
rs.ExecutionTime = rule.RuleStats.ProcessingTime
|
||||
if rule.Success {
|
||||
rs.RuleAppliedCount++
|
||||
} else {
|
||||
rs.RulesFailedCount++
|
||||
}
|
||||
if rule.Patches != nil {
|
||||
rs.MutationCount++
|
||||
}
|
||||
ps.Stats.Rules = append(ps.Stats.Rules, rs)
|
||||
}
|
||||
policyStats = append(policyStats, ps)
|
||||
}
|
||||
// send stats for aggregation
|
||||
sendStat := func(blocked bool) {
|
||||
for _, stat := range policyStats {
|
||||
stat.Stats.ResourceBlocked = utils.Btoi(blocked)
|
||||
//SEND
|
||||
ws.policyStatus.SendStat(stat)
|
||||
}
|
||||
}
|
||||
|
||||
var engineResponses []response.EngineResponse
|
||||
|
||||
userRequestInfo := kyverno.RequestInfo{
|
||||
|
@ -91,12 +57,10 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
|
|||
for _, policy := range policies {
|
||||
glog.V(2).Infof("Handling mutation for Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s",
|
||||
resource.GetKind(), resource.GetNamespace(), resource.GetName(), request.UID, request.Operation)
|
||||
|
||||
policyContext.Policy = policy
|
||||
engineResponse := engine.Mutate(policyContext)
|
||||
engineResponses = append(engineResponses, engineResponse)
|
||||
// Gather policy application statistics
|
||||
gatherStat(policy.Name, engineResponse.PolicyResponse)
|
||||
updateStatusWithMutate(ws.status, policy, engineResponse)
|
||||
if !engineResponse.IsSuccesful() {
|
||||
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName())
|
||||
continue
|
||||
|
@ -125,8 +89,6 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
|
|||
events := generateEvents(engineResponses, (request.Operation == v1beta1.Update))
|
||||
ws.eventGen.Add(events...)
|
||||
|
||||
sendStat(false)
|
||||
|
||||
// debug info
|
||||
func() {
|
||||
if len(patches) != 0 {
|
||||
|
@ -146,3 +108,8 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
|
|||
// patches holds all the successful patches, if no patch is created, it returns nil
|
||||
return engineutils.JoinPatches(patches)
|
||||
}
|
||||
|
||||
func updateStatusWithMutate(statusSync *policy.StatusSync, policy kyverno.ClusterPolicy, response response.EngineResponse) {
|
||||
status := statusSync.Cache().Get(policy.Name)
|
||||
status
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/nirmata/kyverno/pkg/policy"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"github.com/nirmata/kyverno/pkg/checker"
|
||||
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
|
||||
|
@ -18,7 +20,6 @@ import (
|
|||
"github.com/nirmata/kyverno/pkg/config"
|
||||
client "github.com/nirmata/kyverno/pkg/dclient"
|
||||
"github.com/nirmata/kyverno/pkg/event"
|
||||
"github.com/nirmata/kyverno/pkg/policy"
|
||||
"github.com/nirmata/kyverno/pkg/policystore"
|
||||
"github.com/nirmata/kyverno/pkg/policyviolation"
|
||||
tlsutils "github.com/nirmata/kyverno/pkg/tls"
|
||||
|
@ -55,7 +56,7 @@ type WebhookServer struct {
|
|||
// webhook registration client
|
||||
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
|
||||
// API to send policy stats for aggregation
|
||||
policyStatus policy.PolicyStatusInterface
|
||||
status policy.StatusSync
|
||||
// helpers to validate against current loaded configuration
|
||||
configHandler config.Interface
|
||||
// channel for cleanup notification
|
||||
|
@ -82,7 +83,7 @@ func NewWebhookServer(
|
|||
crbInformer rbacinformer.ClusterRoleBindingInformer,
|
||||
eventGen event.Interface,
|
||||
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient,
|
||||
policyStatus policy.PolicyStatusInterface,
|
||||
status *policy.StatusSync,
|
||||
configHandler config.Interface,
|
||||
pMetaStore policystore.LookupInterface,
|
||||
pvGenerator policyviolation.GeneratorInterface,
|
||||
|
@ -112,7 +113,7 @@ func NewWebhookServer(
|
|||
crbSynced: crbInformer.Informer().HasSynced,
|
||||
eventGen: eventGen,
|
||||
webhookRegistrationClient: webhookRegistrationClient,
|
||||
policyStatus: policyStatus,
|
||||
status: status,
|
||||
configHandler: configHandler,
|
||||
cleanUp: cleanUp,
|
||||
lastReqTime: resourceWebhookWatcher.LastReqTime,
|
||||
|
|
Loading…
Add table
Reference in a new issue