1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-03-31 03:45:17 +00:00

Improve / clean up code (#1444)

* Remove lock embedded in CRD controller, use concurrent map to store shcemas

* delete rcr info from data store

* skip policy validation on status update

* - remove status check in policy mutation; - fix test

* Remove fqdncn flag

* add flag profiling port

* skip policy mutation & validation on status update

* sync policy status every minute

* update log messages
This commit is contained in:
shuting 2021-01-06 16:32:02 -08:00 committed by GitHub
parent 35aa3149c8
commit 52d091c5a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 126 additions and 161 deletions

View file

@ -38,20 +38,19 @@ import (
const resyncPeriod = 15 * time.Minute
var (
kubeconfig string
serverIP string
webhookTimeout int
backgroundSync int
runValidationInMutatingWebhook string
profile bool
//TODO: this has been added to backward support command line arguments
// will be removed in future and the configuration will be set only via configmaps
filterK8Resources string
kubeconfig string
serverIP string
runValidationInMutatingWebhook string
excludeGroupRole string
excludeUsername string
// User FQDN as CSR CN
fqdncn bool
profilePort string
webhookTimeout int
profile bool
policyReport bool
setupLog = log.Log.WithName("setup")
)
@ -67,19 +66,13 @@ func main() {
flag.StringVar(&serverIP, "serverIP", "", "IP address where Kyverno controller runs. Only required if out-of-cluster.")
flag.StringVar(&runValidationInMutatingWebhook, "runValidationInMutatingWebhook", "", "Validation will also be done using the mutation webhook, set to 'true' to enable. Older kubernetes versions do not work properly when a validation webhook is registered.")
flag.BoolVar(&profile, "profile", false, "Set this flag to 'true', to enable profiling.")
flag.StringVar(&profilePort, "profile-port", "6060", "Enable profiling at given port, default to 6060.")
if err := flag.Set("v", "2"); err != nil {
setupLog.Error(err, "failed to set log level")
os.Exit(1)
}
// Generate CSR with CN as FQDN due to https://github.com/kyverno/kyverno/issues/542
flag.BoolVar(&fqdncn, "fqdn-as-cn", false, "use FQDN as Common Name in CSR")
flag.Parse()
if profile {
go http.ListenAndServe("localhost:6060", nil)
}
version.PrintVersionInfo(log.Log)
cleanUp := make(chan struct{})
stopCh := signal.SetupSignalHandler()
@ -89,6 +82,18 @@ func main() {
os.Exit(1)
}
if profile {
addr := ":" + profilePort
setupLog.Info("Enable profiling, see details at https://github.com/kyverno/kyverno/wiki/Profiling-Kyverno-on-Kubernetes", "port", profilePort)
go func() {
if err := http.ListenAndServe(addr, nil); err != nil {
setupLog.Error(err, "Failed to enable profiling")
os.Exit(1)
}
}()
}
// KYVERNO CRD CLIENT
// access CRD resources
// - ClusterPolicy, Policy
@ -276,7 +281,7 @@ func main() {
)
// Configure certificates
tlsPair, err := client.InitTLSPemPair(clientConfig, fqdncn)
tlsPair, err := client.InitTLSPemPair(clientConfig)
if err != nil {
setupLog.Error(err, "Failed to initialize TLS key/certificate pair")
os.Exit(1)

View file

@ -16,7 +16,7 @@ import (
// InitTLSPemPair Loads or creates PEM private key and TLS certificate for webhook server.
// Created pair is stored in cluster's secret.
// Returns struct with key/certificate pair.
func (c *Client) InitTLSPemPair(configuration *rest.Config, fqdncn bool) (*tls.PemPair, error) {
func (c *Client) InitTLSPemPair(configuration *rest.Config) (*tls.PemPair, error) {
logger := c.log
certProps, err := c.GetTLSCertProps(configuration)
if err != nil {
@ -24,7 +24,7 @@ func (c *Client) InitTLSPemPair(configuration *rest.Config, fqdncn bool) (*tls.P
}
logger.Info("Building key/certificate pair for TLS")
tlsPair, err := c.buildTLSPemPair(certProps, fqdncn)
tlsPair, err := c.buildTLSPemPair(certProps)
if err != nil {
return nil, err
}
@ -37,7 +37,7 @@ func (c *Client) InitTLSPemPair(configuration *rest.Config, fqdncn bool) (*tls.P
// buildTLSPemPair Issues TLS certificate for webhook server using self-signed CA cert
// Returns signed and approved TLS certificate in PEM format
func (c *Client) buildTLSPemPair(props tls.CertificateProps, fqdncn bool) (*tls.PemPair, error) {
func (c *Client) buildTLSPemPair(props tls.CertificateProps) (*tls.PemPair, error) {
caCert, caPEM, err := tls.GenerateCACert()
if err != nil {
return nil, err
@ -46,7 +46,7 @@ func (c *Client) buildTLSPemPair(props tls.CertificateProps, fqdncn bool) (*tls.
if err := c.WriteCACertToSecret(caPEM, props); err != nil {
return nil, fmt.Errorf("failed to write CA cert to secret: %v", err)
}
return tls.GenerateCertPem(caCert, props, fqdncn)
return tls.GenerateCertPem(caCert, props)
}
//ReadRootCASecret returns the RootCA from the pre-defined secret

View file

@ -34,7 +34,7 @@ func generatePatches(src, dst []byte) ([][]byte, error) {
return patchesBytes, err
}
// preProcessJSONPatchesgo deals with the JsonPatch when reinvocation
// preProcessJSONPatches deals with the JsonPatch when reinvocation
// policy is set in webhook, to avoid generating duplicate values.
// This duplicate error only occurs on type array, if it's adding to a map
// the value will be added to the map if nil, otherwise it overwrites the old value

View file

@ -230,7 +230,7 @@ func (c *Controller) applyGeneratePolicy(log logr.Logger, policyContext engine.P
}
if gr.Status.State == "" && len(genResources) > 0 {
log.V(3).Info("updating policy status", "policy", policy.Name, "data", ruleNameToProcessingTime)
log.V(4).Info("updating policy status", "policy", policy.Name, "data", ruleNameToProcessingTime)
c.policyStatusListener.Update(generateSyncStats{
policyName: policy.Name,
ruleNameToProcessingTime: ruleNameToProcessingTime,

View file

@ -237,7 +237,7 @@ func applyCommandHelper(resourcePaths []string, cluster bool, policyReport bool,
skippedPolicies = make([]SkippedPolicy, 0)
for _, policy := range mutatedPolicies {
err := policy2.Validate(utils.MarshalPolicy(*policy), nil, true, openAPIController)
err := policy2.Validate(policy, nil, true, openAPIController)
if err != nil {
rc.skip += len(resources)
log.Log.V(3).Info(fmt.Sprintf("skipping policy %v as it is not valid", policy.Name), "error", err)

View file

@ -96,7 +96,7 @@ func Command() *cobra.Command {
invalidPolicyFound := false
for _, policy := range policies {
fmt.Println("----------------------------------------------------------------------")
err := policy2.Validate(utils.MarshalPolicy(*policy), nil, true, openAPIController)
err := policy2.Validate(policy, nil, true, openAPIController)
if err != nil {
fmt.Printf("Policy %s is invalid.\n", policy.Name)
fmt.Printf("Error: invalid policy.\nCause: %s\n\n", err)

View file

@ -1,7 +1,6 @@
package openapi
import (
"encoding/json"
"errors"
"fmt"
"strconv"
@ -13,7 +12,6 @@ import (
data "github.com/kyverno/kyverno/api"
v1 "github.com/kyverno/kyverno/pkg/api/kyverno/v1"
"github.com/kyverno/kyverno/pkg/engine"
"github.com/kyverno/kyverno/pkg/engine/utils"
cmap "github.com/orcaman/concurrent-map"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -78,24 +76,7 @@ func NewOpenAPIController() (*Controller, error) {
}
// ValidatePolicyFields ...
func (o *Controller) ValidatePolicyFields(policyRaw []byte) error {
var policy v1.ClusterPolicy
err := json.Unmarshal(policyRaw, &policy)
if err != nil {
return err
}
policyUnst, err := utils.ConvertToUnstructured(policyRaw)
if err != nil {
return err
}
err = o.ValidateResource(*policyUnst.DeepCopy(), "ClusterPolicy")
if err != nil {
return err
}
func (o *Controller) ValidatePolicyFields(policy v1.ClusterPolicy) error {
return o.ValidatePolicyMutation(policy)
}

View file

@ -21,19 +21,8 @@ import (
// Validate does some initial check to verify some conditions
// - One operation per rule
// - ResourceDescription mandatory checks
func Validate(policyRaw []byte, client *dclient.Client, mock bool, openAPIController *openapi.Controller) error {
// check for invalid fields
err := checkInvalidFields(policyRaw)
if err != nil {
return err
}
var p kyverno.ClusterPolicy
err = json.Unmarshal(policyRaw, &p)
if err != nil {
return fmt.Errorf("failed to unmarshal policy: %v", err)
}
func Validate(policy *kyverno.ClusterPolicy, client *dclient.Client, mock bool, openAPIController *openapi.Controller) error {
p := *policy
if len(common.PolicyHasVariables(p)) > 0 && common.PolicyHasNonAllowedVariables(p) {
return fmt.Errorf("policy contains invalid variables")
}
@ -163,7 +152,7 @@ func Validate(policyRaw []byte, client *dclient.Client, mock bool, openAPIContro
}
if !mock {
if err := openAPIController.ValidatePolicyFields(policyRaw); err != nil {
if err := openAPIController.ValidatePolicyFields(p); err != nil {
return err
}
} else {
@ -175,35 +164,6 @@ func Validate(policyRaw []byte, client *dclient.Client, mock bool, openAPIContro
return nil
}
// checkInvalidFields - checks invalid fields in webhook policy request
// policy supports 5 json fields in types.go i.e. "apiVersion", "kind", "metadata", "spec", "status"
// If the webhook request policy contains new fields then block creation of policy
func checkInvalidFields(policyRaw []byte) error {
// hardcoded supported fields by policy
var allowedKeys = []string{"apiVersion", "kind", "metadata", "spec", "status"}
var data interface{}
err := json.Unmarshal(policyRaw, &data)
if err != nil {
return fmt.Errorf("failed to unmarshal policy admission request err %v", err)
}
mapData := data.(map[string]interface{})
// validate any new fields in the admission request against the supported fields and block the request with any new fields
for requestField := range mapData {
ok := false
for _, allowedField := range allowedKeys {
if requestField == allowedField {
ok = true
break
}
}
if !ok {
return fmt.Errorf("unknown field \"%s\" in policy", requestField)
}
}
return nil
}
// doMatchAndExcludeConflict checks if the resultant
// of match and exclude block is not an empty set
func doMatchAndExcludeConflict(rule kyverno.Rule) bool {

View file

@ -372,8 +372,11 @@ func Test_Validate_Policy(t *testing.T) {
}`)
openAPIController, _ := openapi.NewOpenAPIController()
var policy *kyverno.ClusterPolicy
err := json.Unmarshal(rawPolicy, &policy)
assert.NilError(t, err)
err := Validate(rawPolicy, nil, true, openAPIController)
err = Validate(policy, nil, true, openAPIController)
assert.NilError(t, err)
}
@ -515,8 +518,12 @@ func Test_Validate_ErrorFormat(t *testing.T) {
}
`)
var policy *kyverno.ClusterPolicy
err := json.Unmarshal(rawPolicy, &policy)
assert.NilError(t, err)
openAPIController, _ := openapi.NewOpenAPIController()
err := Validate(rawPolicy, nil, true, openAPIController)
err = Validate(policy, nil, true, openAPIController)
assert.Assert(t, err != nil)
}

View file

@ -6,7 +6,6 @@ import (
backoff "github.com/cenkalti/backoff"
kyverno "github.com/kyverno/kyverno/pkg/api/kyverno/v1"
v1 "github.com/kyverno/kyverno/pkg/api/kyverno/v1"
client "github.com/kyverno/kyverno/pkg/dclient"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
unstructured "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -57,27 +56,3 @@ func converLabelToSelector(labelMap map[string]string) (labels.Selector, error)
return policyViolationSelector, nil
}
type violationCount struct {
policyName string
violatedRules []v1.ViolatedRule
}
func (vc violationCount) PolicyName() string {
return vc.policyName
}
func (vc violationCount) UpdateStatus(status kyverno.PolicyStatus) kyverno.PolicyStatus {
var ruleNameToViolations = make(map[string]int)
for _, rule := range vc.violatedRules {
ruleNameToViolations[rule.Name]++
}
for i := range status.Rules {
status.ViolationCount += ruleNameToViolations[status.Rules[i].Name]
status.Rules[i].ViolationCount += ruleNameToViolations[status.Rules[i].Name]
}
return status
}

View file

@ -201,8 +201,14 @@ func (gen *Generator) runWorker() {
func (gen *Generator) handleErr(err error, key interface{}) {
logger := gen.log
keyHash, ok := key.(string)
if !ok {
keyHash = ""
}
if err == nil {
gen.queue.Forget(key)
gen.dataStore.delete(keyHash)
return
}
@ -215,10 +221,8 @@ func (gen *Generator) handleErr(err error, key interface{}) {
logger.Error(err, "failed to process report request", "key", key)
gen.queue.Forget(key)
if keyHash, ok := key.(string); ok {
gen.dataStore.delete(keyHash)
}
}
func (gen *Generator) processNextWorkItem() bool {
logger := gen.log
@ -259,8 +263,6 @@ func (gen *Generator) processNextWorkItem() bool {
}
func (gen *Generator) syncHandler(info Info) error {
gen.log.V(4).Info("reconcile report change request")
builder := NewBuilder(gen.cpolLister, gen.polLister)
rcrUnstructured, err := builder.build(info)
if err != nil {
@ -271,6 +273,7 @@ func (gen *Generator) syncHandler(info Info) error {
return nil
}
gen.log.V(4).Info("reconcile report change request", "key", info.ToKey())
return gen.sync(rcrUnstructured, info)
}

View file

@ -27,7 +27,7 @@ import (
//on a channel.
//The worker then updates the current status using the methods
//exposed by the interface.
//Current implementation is designed to be thread safe with optimised
//Current implementation is designed to be thread safe with optimized
//locking for each policy.
// statusUpdater defines a type to have a method which
@ -86,18 +86,19 @@ func (s *Sync) Run(workers int, stopCh <-chan struct{}) {
go s.updateStatusCache(stopCh)
}
wait.Until(s.updatePolicyStatus, time.Second, stopCh)
// sync the status to the existing policy every minute
wait.Until(s.writePolicyStatus, time.Minute, stopCh)
<-stopCh
}
// updateStatusCache is a worker which updates the current status
//using the statusUpdater interface
// updateStatusCache is a worker which adds the current status
// to the cache, using the statusUpdater interface
func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
for {
select {
case statusUpdater := <-s.Listener:
name := statusUpdater.PolicyName()
s.log.V(3).Info("received policy status update request", "policy", name)
s.log.V(4).Info("received policy status update request", "policy", name)
s.cache.keyToMutex.Get(name).Lock()
@ -121,7 +122,7 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
oldStatus, _ := json.Marshal(status)
newStatus, _ := json.Marshal(updatedStatus)
s.log.V(4).Info("updated policy status", "policy", statusUpdater.PolicyName(),
s.log.V(5).Info("updated policy status in the cache", "policy", statusUpdater.PolicyName(),
"oldStatus", string(oldStatus), "newStatus", string(newStatus))
case <-stopCh:
@ -130,11 +131,11 @@ func (s *Sync) updateStatusCache(stopCh <-chan struct{}) {
}
}
// updatePolicyStatus updates the status in the policy resource definition
// from the status cache, syncing them
func (s *Sync) updatePolicyStatus() {
// writePolicyStatus sends the update request to the APIServer
// syncs the status (from cache) to the policy
func (s *Sync) writePolicyStatus() {
for key, status := range s.getCachedStatus() {
s.log.V(3).Info("updating policy status", "policy", key)
s.log.V(4).Info("updating policy status", "policy", key)
namespace, policyName := s.parseStatusKey(key)
if namespace == "" {
s.updateClusterPolicy(policyName, key, status)

View file

@ -105,7 +105,7 @@ func GenerateCACert() (*KeyPair, *PemPair, error) {
// GenerateCertPem takes the results of GenerateCACert and uses it to create the
// PEM-encoded public certificate and private key, respectively
func GenerateCertPem(caCert *KeyPair, props CertificateProps, fqdncn bool) (*PemPair, error) {
func GenerateCertPem(caCert *KeyPair, props CertificateProps) (*PemPair, error) {
now := time.Now()
begin := now.Add(-1 * time.Hour)
end := now.Add(certValidityDuration)
@ -119,11 +119,6 @@ func GenerateCertPem(caCert *KeyPair, props CertificateProps, fqdncn bool) (*Pem
commonName := GenerateInClusterServiceName(props)
dnsNames[2] = fmt.Sprintf("%s", commonName)
if fqdncn {
// use FQDN as CommonName as a workaournd for https://github.com/kyverno/kyverno/issues/542
csCommonName = commonName
}
var ips []net.IP
apiServerIP := net.ParseIP(props.APIServerHost)
if apiServerIP != nil {

View file

@ -5,7 +5,9 @@ import (
"fmt"
"reflect"
"strings"
"time"
logr "github.com/go-logr/logr"
kyverno "github.com/kyverno/kyverno/pkg/api/kyverno/v1"
"github.com/kyverno/kyverno/pkg/policymutation"
v1beta1 "k8s.io/api/admission/v1beta1"
@ -14,12 +16,12 @@ import (
func (ws *WebhookServer) policyMutation(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
logger := ws.log.WithValues("action", "policy mutation", "uid", request.UID, "kind", request.Kind, "namespace", request.Namespace, "name", request.Name, "operation", request.Operation)
var policy, oldPolicy *kyverno.ClusterPolicy
var policy *kyverno.ClusterPolicy
raw := request.Object.Raw
//TODO: can this happen? wont this be picked by OpenAPI spec schema ?
if err := json.Unmarshal(raw, &policy); err != nil {
logger.Error(err, "failed to unmarshall policy admission request")
logger.Error(err, "failed to unmarshal policy admission request")
return &v1beta1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
@ -29,25 +31,20 @@ func (ws *WebhookServer) policyMutation(request *v1beta1.AdmissionRequest) *v1be
}
if request.Operation == v1beta1.Update {
if err := json.Unmarshal(request.OldObject.Raw, &oldPolicy); err != nil {
logger.Error(err, "failed to unmarshall old policy admission request")
return &v1beta1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
Message: fmt.Sprintf("failed to default value, check kyverno controller logs for details: %v", err),
},
admissionResponse := hasPolicyChanged(policy, request.OldObject.Raw, logger)
if admissionResponse != nil {
logger.V(4).Info("skip policy mutation on status update")
return admissionResponse
}
}
if isStatusUpdate(oldPolicy, policy) {
logger.V(3).Info("skip policy mutation on status update")
return &v1beta1.AdmissionResponse{Allowed: true}
}
}
startTime := time.Now()
logger.V(3).Info("start policy change mutation")
defer logger.V(3).Info("finished policy change mutation", "time", time.Since(startTime).String())
// Generate JSON Patches for defaults
patches, updateMsgs := policymutation.GenerateJSONPatchesForDefaults(policy, logger)
if patches != nil {
if len(patches) != 0 {
patchType := v1beta1.PatchTypeJSONPatch
return &v1beta1.AdmissionResponse{
Allowed: true,
@ -58,11 +55,32 @@ func (ws *WebhookServer) policyMutation(request *v1beta1.AdmissionRequest) *v1be
PatchType: &patchType,
}
}
return &v1beta1.AdmissionResponse{
Allowed: true,
}
}
func hasPolicyChanged(policy *kyverno.ClusterPolicy, oldRaw []byte, logger logr.Logger) *v1beta1.AdmissionResponse {
var oldPolicy *kyverno.ClusterPolicy
if err := json.Unmarshal(oldRaw, &oldPolicy); err != nil {
logger.Error(err, "failed to unmarshal old policy admission request")
return &v1beta1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
Message: fmt.Sprintf("failed to validate policy, check kyverno controller logs for details: %v", err),
},
}
}
if isStatusUpdate(oldPolicy, policy) {
return &v1beta1.AdmissionResponse{Allowed: true}
}
return nil
}
func isStatusUpdate(old, new *kyverno.ClusterPolicy) bool {
if !reflect.DeepEqual(old.GetAnnotations(), new.GetAnnotations()) {
return false

View file

@ -1,24 +1,44 @@
package webhooks
import (
"encoding/json"
"fmt"
"time"
kyverno "github.com/kyverno/kyverno/pkg/api/kyverno/v1"
policyvalidate "github.com/kyverno/kyverno/pkg/policy"
v1beta1 "k8s.io/api/admission/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//HandlePolicyValidation performs the validation check on policy resource
func (ws *WebhookServer) policyValidation(request *v1beta1.AdmissionRequest) *v1beta1.AdmissionResponse {
logger := ws.log.WithValues("action", "policy validation", "uid", request.UID, "kind", request.Kind, "namespace", request.Namespace, "name", request.Name, "operation", request.Operation)
var policy *kyverno.ClusterPolicy
if err := json.Unmarshal(request.Object.Raw, &policy); err != nil {
logger.Error(err, "failed to unmarshal policy admission request")
return &v1beta1.AdmissionResponse{
Allowed: true,
Result: &metav1.Status{
Message: fmt.Sprintf("failed to validate policy, check kyverno controller logs for details: %v", err),
},
}
}
if request.Operation == v1beta1.Update {
admissionResponse := hasPolicyChanged(policy, request.OldObject.Raw, logger)
if admissionResponse != nil {
logger.V(4).Info("skip policy validation on status update")
return admissionResponse
}
}
startTime := time.Now()
logger.V(3).Info("start validating policy")
defer logger.V(3).Info("finished validating policy", "time", time.Since(startTime).String())
logger.V(3).Info("start policy change validation")
defer logger.V(3).Info("finished policy change validation", "time", time.Since(startTime).String())
if err := policyvalidate.Validate(request.Object.Raw, ws.client, false, ws.openAPIController); err != nil {
if err := policyvalidate.Validate(policy, ws.client, false, ws.openAPIController); err != nil {
logger.Error(err, "policy validation errors")
return &v1beta1.AdmissionResponse{
Allowed: false,

View file

@ -255,7 +255,7 @@ func (ws *WebhookServer) handlerFunc(handler func(request *v1beta1.AdmissionRequ
admissionReview.Response = handler(request)
writeResponse(rw, admissionReview)
logger.V(3).Info("admission review request processed", "time", time.Since(startTime).String())
logger.V(4).Info("admission review request processed", "time", time.Since(startTime).String())
return
}