1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-05 07:26:55 +00:00

Revert "767 tested prototype"

This reverts commit 4ae65497bf.
This commit is contained in:
shravan 2020-03-29 07:25:13 +05:30
parent 4ae65497bf
commit f055076041
7 changed files with 142 additions and 97 deletions

View file

@ -219,26 +219,22 @@ func main() {
// -- annotations on resources with update details on mutation JSON patches
// -- generate policy violation resource
// -- generate events on policy and resource
server, err := webhooks.NewWebhookServer(&webhooks.WebhookServer{
Client: client,
KyvernoClient: pclient,
PLister: pInformer.Kyverno().V1().ClusterPolicies().Lister(),
PSynced: pInformer.Kyverno().V1().ClusterPolicies().Informer().HasSynced,
RbLister: kubeInformer.Rbac().V1().RoleBindings().Lister(),
RbSynced: kubeInformer.Rbac().V1().RoleBindings().Informer().HasSynced,
CrbLister: kubeInformer.Rbac().V1().ClusterRoleBindings().Lister(),
CrbSynced: kubeInformer.Rbac().V1().ClusterRoleBindings().Informer().HasSynced,
EventGen: egen,
WebhookRegistrationClient: webhookRegistrationClient,
StatusListener: statusSync.Listener,
ConfigHandler: configData,
CleanUp: cleanUp,
LastReqTime: rWebhookWatcher.LastReqTime,
PvGenerator: pvgen,
PMetaStore: policyMetaStore,
GrGenerator: grgen,
ResourceWebhookWatcher: rWebhookWatcher,
}, tlsPair)
server, err := webhooks.NewWebhookServer(
pclient,
client,
tlsPair,
pInformer.Kyverno().V1().ClusterPolicies(),
kubeInformer.Rbac().V1().RoleBindings(),
kubeInformer.Rbac().V1().ClusterRoleBindings(),
egen,
webhookRegistrationClient,
statusSync.Listener,
configData,
policyMetaStore,
pvgen,
grgen,
rWebhookWatcher,
cleanUp)
if err != nil {
glog.Fatalf("Unable to create webhook server: %v\n", err)
}

View file

@ -1 +0,0 @@
package generate

View file

@ -66,13 +66,13 @@ func (ws *WebhookServer) HandleGenerate(request *v1beta1.AdmissionRequest, polic
if len(engineResponse.PolicyResponse.Rules) > 0 {
// some generate rules do apply to the resource
engineResponses = append(engineResponses, engineResponse)
ws.StatusListener.Send(generateStats{
ws.statusListener.Send(generateStats{
resp: engineResponse,
})
}
}
// Adds Generate Request to a channel(queue size 1000) to generators
if err := createGenerateRequest(ws.GrGenerator, userRequestInfo, engineResponses...); err != nil {
if err := createGenerateRequest(ws.grGenerator, userRequestInfo, engineResponses...); err != nil {
//TODO: send appropriate error
return false, "Kyverno blocked: failed to create Generate Requests"
}

View file

@ -63,7 +63,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
policyContext.Policy = policy
engineResponse := engine.Mutate(policyContext)
engineResponses = append(engineResponses, engineResponse)
ws.StatusListener.Send(mutateStats{resp: engineResponse})
ws.statusListener.Send(mutateStats{resp: engineResponse})
if !engineResponse.IsSuccesful() {
glog.V(4).Infof("Failed to apply policy %s on resource %s/%s\n", policy.Name, resource.GetNamespace(), resource.GetName())
continue
@ -91,7 +91,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
// AUDIT
// generate violation when response fails
pvInfos := policyviolation.GeneratePVsFromEngineResponse(engineResponses)
ws.PvGenerator.Add(pvInfos...)
ws.pvGenerator.Add(pvInfos...)
// REPORTING EVENTS
// Scenario 1:
// some/all policies failed to apply on the resource. a policy volation is generated.
@ -101,7 +101,7 @@ func (ws *WebhookServer) HandleMutation(request *v1beta1.AdmissionRequest, resou
// create an event on the resource
// ADD EVENTS
events := generateEvents(engineResponses, false, (request.Operation == v1beta1.Update))
ws.EventGen.Add(events...)
ws.eventGen.Add(events...)
// debug info
func() {

View file

@ -39,7 +39,7 @@ func (ws *WebhookServer) handlePolicyValidation(request *v1beta1.AdmissionReques
if admissionResp.Allowed {
// if the policy contains mutating & validation rules and it config does not exist we create one
// queue the request
ws.ResourceWebhookWatcher.RegisterResourceWebhook()
ws.resourceWebhookWatcher.RegisterResourceWebhook()
}
return admissionResp
}

View file

@ -13,6 +13,7 @@ import (
"github.com/golang/glog"
"github.com/nirmata/kyverno/pkg/checker"
kyvernoclient "github.com/nirmata/kyverno/pkg/client/clientset/versioned"
kyvernoinformer "github.com/nirmata/kyverno/pkg/client/informers/externalversions/kyverno/v1"
kyvernolister "github.com/nirmata/kyverno/pkg/client/listers/kyverno/v1"
"github.com/nirmata/kyverno/pkg/config"
client "github.com/nirmata/kyverno/pkg/dclient"
@ -26,6 +27,7 @@ import (
"github.com/nirmata/kyverno/pkg/webhooks/generate"
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
rbacinformer "k8s.io/client-go/informers/rbac/v1"
rbaclister "k8s.io/client-go/listers/rbac/v1"
"k8s.io/client-go/tools/cache"
)
@ -34,50 +36,64 @@ import (
// MutationWebhook gets policies from policyController and takes control of the cluster with kubeclient.
type WebhookServer struct {
server http.Server
Client *client.Client
KyvernoClient *kyvernoclient.Clientset
client *client.Client
kyvernoClient *kyvernoclient.Clientset
// list/get cluster policy resource
PLister kyvernolister.ClusterPolicyLister
pLister kyvernolister.ClusterPolicyLister
// returns true if the cluster policy store has synced atleast
PSynced cache.InformerSynced
pSynced cache.InformerSynced
// list/get role binding resource
RbLister rbaclister.RoleBindingLister
rbLister rbaclister.RoleBindingLister
// return true if role bining store has synced atleast once
RbSynced cache.InformerSynced
rbSynced cache.InformerSynced
// list/get cluster role binding resource
CrbLister rbaclister.ClusterRoleBindingLister
crbLister rbaclister.ClusterRoleBindingLister
// return true if cluster role binding store has synced atleast once
CrbSynced cache.InformerSynced
crbSynced cache.InformerSynced
// generate events
EventGen event.Interface
eventGen event.Interface
// webhook registration client
WebhookRegistrationClient *webhookconfig.WebhookRegistrationClient
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient
// API to send policy stats for aggregation
StatusListener policystatus.Listener
statusListener policystatus.Listener
// helpers to validate against current loaded configuration
ConfigHandler config.Interface
configHandler config.Interface
// channel for cleanup notification
CleanUp chan<- struct{}
cleanUp chan<- struct{}
// last request time
LastReqTime *checker.LastReqTime
lastReqTime *checker.LastReqTime
// store to hold policy meta data for faster lookup
PMetaStore policystore.LookupInterface
pMetaStore policystore.LookupInterface
// policy violation generator
PvGenerator policyviolation.GeneratorInterface
pvGenerator policyviolation.GeneratorInterface
// generate request generator
GrGenerator *generate.Generator
ResourceWebhookWatcher *webhookconfig.ResourceWebhookRegister
grGenerator *generate.Generator
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister
}
// NewWebhookServer creates new instance of WebhookServer accordingly to given configuration
// Policy Controller and Kubernetes Client should be initialized in configuration
func NewWebhookServer(
ws *WebhookServer,
tlsPair *tlsutils.TlsPemPair) (*WebhookServer, error) {
kyvernoClient *kyvernoclient.Clientset,
client *client.Client,
tlsPair *tlsutils.TlsPemPair,
pInformer kyvernoinformer.ClusterPolicyInformer,
rbInformer rbacinformer.RoleBindingInformer,
crbInformer rbacinformer.ClusterRoleBindingInformer,
eventGen event.Interface,
webhookRegistrationClient *webhookconfig.WebhookRegistrationClient,
statusSync policystatus.Listener,
configHandler config.Interface,
pMetaStore policystore.LookupInterface,
pvGenerator policyviolation.GeneratorInterface,
grGenerator *generate.Generator,
resourceWebhookWatcher *webhookconfig.ResourceWebhookRegister,
cleanUp chan<- struct{}) (*WebhookServer, error) {
if tlsPair == nil {
return nil, errors.New("NewWebhookServer is not initialized properly")
}
var tlsConfig tls.Config
pair, err := tls.X509KeyPair(tlsPair.Certificate, tlsPair.PrivateKey)
if err != nil {
@ -85,12 +101,32 @@ func NewWebhookServer(
}
tlsConfig.Certificates = []tls.Certificate{pair}
ws := &WebhookServer{
client: client,
kyvernoClient: kyvernoClient,
pLister: pInformer.Lister(),
pSynced: pInformer.Informer().HasSynced,
rbLister: rbInformer.Lister(),
rbSynced: rbInformer.Informer().HasSynced,
crbLister: crbInformer.Lister(),
crbSynced: crbInformer.Informer().HasSynced,
eventGen: eventGen,
webhookRegistrationClient: webhookRegistrationClient,
statusListener: statusSync,
configHandler: configHandler,
cleanUp: cleanUp,
lastReqTime: resourceWebhookWatcher.LastReqTime,
pvGenerator: pvGenerator,
pMetaStore: pMetaStore,
grGenerator: grGenerator,
resourceWebhookWatcher: resourceWebhookWatcher,
}
mux := http.NewServeMux()
mux.HandleFunc(config.MutatingWebhookServicePath, ws.handlerFunc(ws.handleMutateAdmissionRequest, true))
mux.HandleFunc(config.ValidatingWebhookServicePath, ws.handlerFunc(ws.handleValidateAdmissionRequest, true))
mux.HandleFunc(config.PolicyMutatingWebhookServicePath, ws.handlerFunc(ws.handlePolicyMutation, true))
mux.HandleFunc(config.PolicyValidatingWebhookServicePath, ws.handlerFunc(ws.handlePolicyValidation, true))
mux.HandleFunc(config.VerifyMutatingWebhookServicePath, ws.handlerFunc(ws.handleVerifyRequest, false))
mux.HandleFunc(config.MutatingWebhookServicePath, ws.serve)
mux.HandleFunc(config.ValidatingWebhookServicePath, ws.serve)
mux.HandleFunc(config.VerifyMutatingWebhookServicePath, ws.serve)
mux.HandleFunc(config.PolicyValidatingWebhookServicePath, ws.serve)
mux.HandleFunc(config.PolicyMutatingWebhookServicePath, ws.serve)
ws.server = http.Server{
Addr: ":443", // Listen on port for HTTPS requests
TLSConfig: &tlsConfig,
@ -102,50 +138,64 @@ func NewWebhookServer(
return ws, nil
}
func (ws *WebhookServer) handlerFunc(handler func(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse, filter bool) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
// for every request received on the ep update last request time,
// this is used to verify admission control
ws.LastReqTime.SetTime(time.Now())
admissionReview := ws.bodyToAdmissionReview(r, w)
if admissionReview == nil {
return
}
defer func() {
glog.V(4).Infof("request: %v %s/%s/%s", time.Since(startTime), admissionReview.Request.Kind, admissionReview.Request.Namespace, admissionReview.Request.Name)
}()
// Main server endpoint for all requests
func (ws *WebhookServer) serve(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
// for every request received on the ep update last request time,
// this is used to verify admission control
ws.lastReqTime.SetTime(time.Now())
admissionReview := ws.bodyToAdmissionReview(r, w)
if admissionReview == nil {
return
}
defer func() {
glog.V(4).Infof("request: %v %s/%s/%s", time.Since(startTime), admissionReview.Request.Kind, admissionReview.Request.Namespace, admissionReview.Request.Name)
}()
admissionReview.Response = &v1beta1.AdmissionResponse{
Allowed: true,
}
admissionReview.Response = &v1beta1.AdmissionResponse{
Allowed: true,
}
// Do not process the admission requests for kinds that are in filterKinds for filtering
request := admissionReview.Request
if filter {
if !ws.ConfigHandler.ToFilter(request.Kind.Kind, request.Namespace, request.Name) {
admissionReview.Response = handler(request)
}
} else {
admissionReview.Response = handler(request)
// Do not process the admission requests for kinds that are in filterKinds for filtering
request := admissionReview.Request
switch r.URL.Path {
case config.VerifyMutatingWebhookServicePath:
// we do not apply filters as this endpoint is used explicitly
// to watch kyveno deployment and verify if admission control is enabled
admissionReview.Response = ws.handleVerifyRequest(request)
case config.MutatingWebhookServicePath:
if !ws.configHandler.ToFilter(request.Kind.Kind, request.Namespace, request.Name) {
admissionReview.Response = ws.handleMutateAdmissionRequest(request)
}
admissionReview.Response.UID = request.UID
case config.ValidatingWebhookServicePath:
if !ws.configHandler.ToFilter(request.Kind.Kind, request.Namespace, request.Name) {
admissionReview.Response = ws.handleValidateAdmissionRequest(request)
}
case config.PolicyValidatingWebhookServicePath:
if !ws.configHandler.ToFilter(request.Kind.Kind, request.Namespace, request.Name) {
admissionReview.Response = ws.handlePolicyValidation(request)
}
case config.PolicyMutatingWebhookServicePath:
if !ws.configHandler.ToFilter(request.Kind.Kind, request.Namespace, request.Name) {
admissionReview.Response = ws.handlePolicyMutation(request)
}
}
admissionReview.Response.UID = request.UID
responseJSON, err := json.Marshal(admissionReview)
if err != nil {
http.Error(w, fmt.Sprintf("Could not encode response: %v", err), http.StatusInternalServerError)
return
}
responseJSON, err := json.Marshal(admissionReview)
if err != nil {
http.Error(w, fmt.Sprintf("Could not encode response: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if _, err := w.Write(responseJSON); err != nil {
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
w.Header().Set("Content-Type", "application/json; charset=utf-8")
if _, err := w.Write(responseJSON); err != nil {
http.Error(w, fmt.Sprintf("could not write response: %v", err), http.StatusInternalServerError)
}
}
func (ws *WebhookServer) handleMutateAdmissionRequest(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
policies, err := ws.PMetaStore.ListAll()
policies, err := ws.pMetaStore.ListAll()
if err != nil {
// Unable to connect to policy Lister to access policies
glog.Errorf("Unable to connect to policy controller to access policies. Policies are NOT being applied: %v", err)
@ -157,7 +207,7 @@ func (ws *WebhookServer) handleMutateAdmissionRequest(request *v1beta1.Admission
// getRoleRef only if policy has roles/clusterroles defined
startTime := time.Now()
if containRBACinfo(policies) {
roles, clusterRoles, err = userinfo.GetRoleRef(ws.RbLister, ws.CrbLister, request)
roles, clusterRoles, err = userinfo.GetRoleRef(ws.rbLister, ws.crbLister, request)
if err != nil {
// TODO(shuting): continue apply policy if error getting roleRef?
glog.Errorf("Unable to get rbac information for request Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s: %v",
@ -197,7 +247,7 @@ func (ws *WebhookServer) handleMutateAdmissionRequest(request *v1beta1.Admission
// patch the resource with patches before handling validation rules
patchedResource := processResourceWithPatches(patches, request.Object.Raw)
if ws.ResourceWebhookWatcher != nil && ws.ResourceWebhookWatcher.RunValidationInMutatingWebhook == "true" {
if ws.resourceWebhookWatcher != nil && ws.resourceWebhookWatcher.RunValidationInMutatingWebhook == "true" {
// VALIDATION
ok, msg := ws.HandleValidation(request, policies, patchedResource, roles, clusterRoles)
if !ok {
@ -242,7 +292,7 @@ func (ws *WebhookServer) handleMutateAdmissionRequest(request *v1beta1.Admission
}
func (ws *WebhookServer) handleValidateAdmissionRequest(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
policies, err := ws.PMetaStore.ListAll()
policies, err := ws.pMetaStore.ListAll()
if err != nil {
// Unable to connect to policy Lister to access policies
glog.Errorf("Unable to connect to policy controller to access policies. Policies are NOT being applied: %v", err)
@ -254,7 +304,7 @@ func (ws *WebhookServer) handleValidateAdmissionRequest(request *v1beta1.Admissi
// getRoleRef only if policy has roles/clusterroles defined
startTime := time.Now()
if containRBACinfo(policies) {
roles, clusterRoles, err = userinfo.GetRoleRef(ws.RbLister, ws.CrbLister, request)
roles, clusterRoles, err = userinfo.GetRoleRef(ws.rbLister, ws.crbLister, request)
if err != nil {
// TODO(shuting): continue apply policy if error getting roleRef?
glog.Errorf("Unable to get rbac information for request Kind=%s, Namespace=%s Name=%s UID=%s patchOperation=%s: %v",
@ -286,7 +336,7 @@ func (ws *WebhookServer) handleValidateAdmissionRequest(request *v1beta1.Admissi
// RunAsync TLS server in separate thread and returns control immediately
func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) {
if !cache.WaitForCacheSync(stopCh, ws.PSynced, ws.RbSynced, ws.CrbSynced) {
if !cache.WaitForCacheSync(stopCh, ws.pSynced, ws.rbSynced, ws.crbSynced) {
glog.Error("webhook: failed to sync informer cache")
}
@ -301,7 +351,7 @@ func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) {
// resync: 60 seconds
// deadline: 60 seconds (send request)
// max deadline: deadline*3 (set the deployment annotation as false)
go ws.LastReqTime.Run(ws.PLister, ws.EventGen, ws.Client, checker.DefaultResync, checker.DefaultDeadline, stopCh)
go ws.lastReqTime.Run(ws.pLister, ws.eventGen, ws.client, checker.DefaultResync, checker.DefaultDeadline, stopCh)
}
@ -309,7 +359,7 @@ func (ws *WebhookServer) RunAsync(stopCh <-chan struct{}) {
func (ws *WebhookServer) Stop(ctx context.Context) {
// cleanUp
// remove the static webhookconfigurations
go ws.WebhookRegistrationClient.RemoveWebhookConfigurations(ws.CleanUp)
go ws.webhookRegistrationClient.RemoveWebhookConfigurations(ws.cleanUp)
// shutdown http.Server with context timeout
err := ws.server.Shutdown(ctx)
if err != nil {

View file

@ -71,7 +71,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
continue
}
engineResponses = append(engineResponses, engineResponse)
ws.StatusListener.Send(validateStats{
ws.statusListener.Send(validateStats{
resp: engineResponse,
})
if !engineResponse.IsSuccesful() {
@ -98,7 +98,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
// all policies were applied succesfully.
// create an event on the resource
events := generateEvents(engineResponses, blocked, (request.Operation == v1beta1.Update))
ws.EventGen.Add(events...)
ws.eventGen.Add(events...)
if blocked {
glog.V(4).Infof("resource %s/%s/%s is blocked\n", newR.GetKind(), newR.GetNamespace(), newR.GetName())
return false, getEnforceFailureErrorMsg(engineResponses)
@ -107,7 +107,7 @@ func (ws *WebhookServer) HandleValidation(request *v1beta1.AdmissionRequest, pol
// ADD POLICY VIOLATIONS
// violations are created with resource on "audit"
pvInfos := policyviolation.GeneratePVsFromEngineResponse(engineResponses)
ws.PvGenerator.Add(pvInfos...)
ws.pvGenerator.Add(pvInfos...)
// report time end
glog.V(4).Infof("report: %v %s/%s/%s", time.Since(reportTime), request.Kind, request.Namespace, request.Name)
return true, ""