From 2872a259ec28e1b13b56f5d4a3516db1f31876b3 Mon Sep 17 00:00:00 2001 From: Frank Jogeleit Date: Sat, 27 Feb 2021 19:11:49 +0100 Subject: [PATCH] Development (#7) * Implement elasticsearch * Update deployment * Add Changelog --- CHANGELOG.md | 7 ++ README.md | 45 ++++++- charts/policy-reporter/Chart.yaml | 4 +- .../policy-reporter/templates/deployment.yaml | 10 +- .../templates/targetsconfigmap.yaml | 19 +++ charts/policy-reporter/values.yaml | 20 +++- cmd/root.go | 87 ++++---------- cmd/run.go | 89 ++++++++++++++ cmd/send.go | 69 +++++++++++ pkg/config/config.go | 7 ++ pkg/config/resolver.go | 111 +++++++++++++----- pkg/config/resolver_test.go | 107 +++++++++++++++++ pkg/kubernetes/report_client.go | 78 ++++++++++-- pkg/report/client.go | 10 +- pkg/report/model.go | 14 ++- pkg/target/client.go | 2 + pkg/target/elasticsearch/elasticsearch.go | 103 ++++++++++++++++ .../elasticsearch/elasticsearch_test.go | 94 +++++++++++++++ pkg/target/loki/loki.go | 16 ++- pkg/target/loki/loki_test.go | 8 +- 20 files changed, 771 insertions(+), 129 deletions(-) create mode 100644 CHANGELOG.md create mode 100644 charts/policy-reporter/templates/targetsconfigmap.yaml create mode 100644 cmd/run.go create mode 100644 cmd/send.go create mode 100644 pkg/target/elasticsearch/elasticsearch.go create mode 100644 pkg/target/elasticsearch/elasticsearch_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md new file mode 100644 index 00000000..e8f4b3df --- /dev/null +++ b/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +## 0.7.0 + +* Implement Elasticsearch as Target for PolicyReportResults +* Replace CLI flags with a single `config.yaml` to manage target-configurations as separate `ConfigMap` +* Set `loki.skipExistingOnStartup` default value to `true` \ No newline at end of file diff --git a/README.md b/README.md index 5107135e..ed650bff 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ ## Motivation -Kyverno ships with two types of validation. You can either enforce a rule or audit it. If you don't want to block developers or if you want to try out a new rule, you can use the audit functionality. The audit configuration creates [PolicyReports](https://kyverno.io/docs/policy-reports/) which you can access with `kubectl`. Because I can't find a simple solution to get a general overview of this PolicyReports and PolicyReportResults, I created this tool to send information from PolicyReports to [Grafana Loki](https://grafana.com/oss/loki/). As additional feature this tool provides an http server with Prometheus Metrics about ReportPolicy Summaries and ReportPolicyRules. +Kyverno ships with two types of validation. You can either enforce a rule or audit it. If you don't want to block developers or if you want to try out a new rule, you can use the audit functionality. The audit configuration creates [PolicyReports](https://kyverno.io/docs/policy-reports/) which you can access with `kubectl`. Because I can't find a simple solution to get a general overview of this PolicyReports and PolicyReportResults, I created this tool to send information from PolicyReports to different targets like [Grafana Loki](https://grafana.com/oss/loki/). This tool provides by default an HTTP server with Prometheus Metrics on `http://localhost:2112/metrics` about ReportPolicy Summaries and ReportPolicyRules. This project is in an early stage. Please let me know if anything did not work as expected or if you want to send your audits to other targets then Loki. @@ -11,21 +11,56 @@ This project is in an early stage. Please let me know if anything did not work a Installation via Helm Repository +### Add the Helm repository + ```bash helm repo add policy-reporter https://fjogeleit.github.io/policy-reporter -helm install policy-reporter policy-reporter/policy-reporter --set loki.host=http://lokihost:3100 -n policy-reporter --create-namespace ``` + +### Basic Installation - Provides Prometheus Metrics + +```bash +helm install policy-reporter policy-reporter/policy-reporter -n policy-reporter --create-namespace +``` + +### Installation with Loki + +```bash +helm install policy-reporter policy-reporter/policy-reporter --set loki.host=http://loki:3100 -n policy-reporter --create-namespace +``` + +### Installation with Elasticsearch + +```bash +helm install policy-reporter policy-reporter/policy-reporter --set elasticsearch.host=http://elasticsearch:3100 -n policy-reporter --create-namespace +``` + You can also customize the `./charts/policy-reporter/values.yaml` to change the default configurations. ### Additional configurations for Loki -Configure `loki.minimumPriority` to send only results with the configured minimumPriority or above, empty means all results. (info < warning < error) -Configure `loki.skipExistingOnStartup` to skip all results who already existed before the PolicyReporter started. Can be used after the first deployment to prevent duplicated events. +* Configure `loki.minimumPriority` to send only results with the configured minimumPriority or above, empty means all results. (info < warning < error) +* Configure `loki.skipExistingOnStartup` to skip all results who already existed before the PolicyReporter started (default: `true`). ```yaml loki: minimumPriority: "" - skipExistingOnStartup: false + skipExistingOnStartup: true +``` + +### Additional configurations for Elasticsearch + +* Configure `elasticsearch.index` to customize the elasticsearch index. +* Configure `elasticsearch.rotation` is added as suffix to the index. Possible values are `daily`, `monthly`, `annually` and `none`. +* Configure `elasticsearch.minimumPriority` to send only results with the configured minimumPriority or above, empty means all results. (info < warning < error) +* Configure `elasticsearch.skipExistingOnStartup` to skip all results who already existed before the PolicyReporter started (default: `true`). + +```yaml +elasticsearch: + index: "policy-reporter" + rotation: "daily" + minimumPriority: "" + skipExistingOnStartup: true ``` ### Configure Policy Priorities diff --git a/charts/policy-reporter/Chart.yaml b/charts/policy-reporter/Chart.yaml index 3c8cfa77..ec924385 100644 --- a/charts/policy-reporter/Chart.yaml +++ b/charts/policy-reporter/Chart.yaml @@ -3,5 +3,5 @@ name: policy-reporter description: K8s PolicyReporter watches for wgpolicyk8s.io/v1alpha1.PolicyReport resources. It creates Prometheus Metrics and can send rule validation events to Loki type: application -version: 0.7.2 -appVersion: 0.6.0 +version: 0.8.0 +appVersion: 0.7.0 diff --git a/charts/policy-reporter/templates/deployment.yaml b/charts/policy-reporter/templates/deployment.yaml index a0deb928..86d5cb64 100644 --- a/charts/policy-reporter/templates/deployment.yaml +++ b/charts/policy-reporter/templates/deployment.yaml @@ -25,13 +25,7 @@ spec: image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" imagePullPolicy: {{ .Values.image.pullPolicy }} args: - - --loki={{ .Values.loki.host }} - {{- if .Values.loki.minimumPriority }} - - --loki-minimum-priority={{ .Values.loki.minimumPriority }} - {{- end }} - {{- if .Values.loki.skipExistingOnStartup }} - - --loki-skip-existing-on-startup - {{- end }} + - --config=/app/config.yaml ports: - name: http containerPort: 2112 @@ -56,5 +50,5 @@ spec: volumes: - name: config-volume configMap: - name: policy-reporter-config + name: {{ include "policyreporter.fullname" . }}-targets optional: true diff --git a/charts/policy-reporter/templates/targetsconfigmap.yaml b/charts/policy-reporter/templates/targetsconfigmap.yaml new file mode 100644 index 00000000..de967327 --- /dev/null +++ b/charts/policy-reporter/templates/targetsconfigmap.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ include "policyreporter.fullname" . }}-targets + labels: + {{- include "policyreporter.labels" . | nindent 4 }} +data: + config.yaml: |- + loki: + host: {{ .Values.loki.host | quote }} + minimumPriority: {{ .Values.loki.minimumPriority | quote }} + skipExistingOnStartup: {{ .Values.loki.skipExistingOnStartup }} + + elasticsearch: + host: {{ .Values.elasticsearch.host | quote }} + index: {{ .Values.elasticsearch.index | default "policy-reporter" | quote }} + rotation: {{ .Values.elasticsearch.rotation | default "dayli" | quote }} + minimumPriority: {{ .Values.elasticsearch.minimumPriority | quote }} + skipExistingOnStartup: {{ .Values.elasticsearch.skipExistingOnStartup }} diff --git a/charts/policy-reporter/values.yaml b/charts/policy-reporter/values.yaml index 82ecb5b3..d1a2cf90 100644 --- a/charts/policy-reporter/values.yaml +++ b/charts/policy-reporter/values.yaml @@ -1,10 +1,24 @@ loki: # loki host address - host: http://loki.loki-stack.svc.cluster.local:3100 + host: "" # minimum priority "" < info < warning < error minimumPriority: "" # Skip already existing PolicyReportResults on startup - skipExistingOnStartup: false + skipExistingOnStartup: true + +elasticsearch: + # elasticsearch host address + host: "" + # elasticsearch index (default: policy-reporter) + index: "" + # elasticsearch index rotation and index suffix + # possible values: dayli, monthly, annually, none (default: dayli) + rotation: "" + # minimum priority "" < info < warning < error + minimumPriority: "" + # Skip already existing PolicyReportResults on startup + skipExistingOnStartup: true + metrics: serviceMonitor: false @@ -15,7 +29,7 @@ metrics: image: repository: fjogeleit/policy-reporter pullPolicy: IfNotPresent - tag: 0.6.0 + tag: 0.7.0 imagePullSecrets: [] diff --git a/cmd/root.go b/cmd/root.go index 53483079..f14f1ebe 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,79 +1,24 @@ package cmd import ( - "flag" - "net/http" + "log" "github.com/fjogeleit/policy-reporter/pkg/config" - "github.com/fjogeleit/policy-reporter/pkg/report" - "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/spf13/cobra" "github.com/spf13/viper" - "golang.org/x/sync/errgroup" ) // NewCLI creates a new instance of the root CLI func NewCLI() *cobra.Command { rootCmd := &cobra.Command{ - Use: "run", - Short: "Kyverno Policy API", - Long: `Kyverno Policy API and Monitoring`, - RunE: func(cmd *cobra.Command, args []string) error { - c, err := loadConfig(cmd) - if err != nil { - return err - } - - resolver := config.NewResolver(c) - - client, err := resolver.PolicyReportClient() - if err != nil { - return err - } - - policyMetrics, err := resolver.PolicyReportMetrics() - if err != nil { - return err - } - - clusterPolicyMetrics, err := resolver.ClusterPolicyReportMetrics() - if err != nil { - return err - } - - loki := resolver.LokiClient() - - g := new(errgroup.Group) - - g.Go(policyMetrics.GenerateMetrics) - - g.Go(clusterPolicyMetrics.GenerateMetrics) - - if loki != nil { - g.Go(func() error { - return client.WatchRuleValidation(func(r report.Result) { - go loki.Send(r) - }, c.Loki.SkipExisting) - }) - } - - g.Go(func() error { - http.Handle("/metrics", promhttp.Handler()) - - return http.ListenAndServe(":2112", nil) - }) - - return g.Wait() - }, + Use: "policyreporter", + Short: "Generates PolicyReport Metrics and Send Results to different targets", + Long: `Generates Prometheus Metrics from PolicyReports, ClusterPolicyReports and PolicyReportResults. + Sends notifications to different targets like Grafana's Loki.`, } - rootCmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file") - - rootCmd.PersistentFlags().String("loki", "", "loki host: http://loki:3100") - rootCmd.PersistentFlags().String("loki-minimum-priority", "", "Minimum Priority to send Results to Loki (info < warning < error)") - rootCmd.PersistentFlags().Bool("loki-skip-existing-on-startup", false, "Skip Results created before PolicyReporter started. Prevent duplicated sending after new deployment") - - flag.Parse() + rootCmd.AddCommand(newRunCMD()) + rootCmd.AddCommand(newSendCMD()) return rootCmd } @@ -83,8 +28,26 @@ func loadConfig(cmd *cobra.Command) (*config.Config, error) { v.SetDefault("namespace", "policy-reporter") + cfgFile := "" + + configFlag := cmd.Flags().Lookup("config") + if configFlag != nil { + cfgFile = configFlag.Value.String() + } + + if cfgFile != "" { + v.SetConfigFile(cfgFile) + } else { + v.AddConfigPath(".") + v.SetConfigName("config") + } + v.AutomaticEnv() + if err := v.ReadInConfig(); err != nil { + log.Println("[INFO] No target configuration file found") + } + if flag := cmd.Flags().Lookup("loki"); flag != nil { v.BindPFlag("loki.host", flag) } diff --git a/cmd/run.go b/cmd/run.go new file mode 100644 index 00000000..5c11a80e --- /dev/null +++ b/cmd/run.go @@ -0,0 +1,89 @@ +package cmd + +import ( + "flag" + "net/http" + + "github.com/fjogeleit/policy-reporter/pkg/config" + "github.com/fjogeleit/policy-reporter/pkg/report" + "github.com/fjogeleit/policy-reporter/pkg/target" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/spf13/cobra" + "golang.org/x/sync/errgroup" +) + +func newRunCMD() *cobra.Command { + cmd := &cobra.Command{ + Use: "run", + Short: "Run PolicyReporter Watcher & HTTP Metrics Server", + RunE: func(cmd *cobra.Command, args []string) error { + c, err := loadConfig(cmd) + if err != nil { + return err + } + + resolver := config.NewResolver(c) + + client, err := resolver.PolicyReportClient() + if err != nil { + return err + } + + policyMetrics, err := resolver.PolicyReportMetrics() + if err != nil { + return err + } + + clusterPolicyMetrics, err := resolver.ClusterPolicyReportMetrics() + if err != nil { + return err + } + + g := new(errgroup.Group) + + g.Go(policyMetrics.GenerateMetrics) + + g.Go(clusterPolicyMetrics.GenerateMetrics) + + g.Go(func() error { + targets := resolver.TargetClients() + + if len(targets) == 0 { + return nil + } + + return client.WatchPolicyReportResults(func(r report.Result, e bool) { + for _, t := range targets { + go func(target target.Client, result report.Result, preExisted bool) { + if preExisted && target.SkipExistingOnStartup() { + return + } + + target.Send(result) + }(t, r, e) + } + }, resolver.SkipExistingOnStartup()) + }) + + g.Go(func() error { + http.Handle("/metrics", promhttp.Handler()) + + return http.ListenAndServe(":2112", nil) + }) + + return g.Wait() + }, + } + + // For local usage + cmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file") + cmd.PersistentFlags().StringP("config", "c", "", "target configuration file") + + cmd.PersistentFlags().String("loki", "", "loki host: http://loki:3100") + cmd.PersistentFlags().String("loki-minimum-priority", "", "Minimum Priority to send Results to Loki (info < warning < error)") + cmd.PersistentFlags().Bool("loki-skip-existing-on-startup", false, "Skip Results created before PolicyReporter started. Prevent duplicated sending after new deployment") + + flag.Parse() + + return cmd +} diff --git a/cmd/send.go b/cmd/send.go new file mode 100644 index 00000000..d1c3b016 --- /dev/null +++ b/cmd/send.go @@ -0,0 +1,69 @@ +package cmd + +import ( + "flag" + "sync" + + "github.com/fjogeleit/policy-reporter/pkg/config" + "github.com/fjogeleit/policy-reporter/pkg/report" + "github.com/fjogeleit/policy-reporter/pkg/target" + "github.com/spf13/cobra" +) + +func newSendCMD() *cobra.Command { + cmd := &cobra.Command{ + Use: "send", + Short: "Send all current PolicyReportResults to the configured targets", + RunE: func(cmd *cobra.Command, args []string) error { + c, err := loadConfig(cmd) + if err != nil { + return err + } + + resolver := config.NewResolver(c) + + client, err := resolver.PolicyReportClient() + if err != nil { + return err + } + + clients := resolver.TargetClients() + + if len(clients) == 0 { + return nil + } + + results, err := client.FetchPolicyReportResults() + if err != nil { + return err + } + + wg := sync.WaitGroup{} + wg.Add(len(results) * len(clients)) + + for _, result := range results { + for _, client := range clients { + go func(c target.Client, r report.Result) { + c.Send(r) + wg.Done() + }(client, result) + } + } + + wg.Wait() + + return err + }, + } + + // For local usage + cmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file") + cmd.PersistentFlags().StringP("config", "c", "", "target configuration file") + + cmd.PersistentFlags().String("loki", "", "loki host: http://loki:3100") + cmd.PersistentFlags().String("loki-minimum-priority", "", "Minimum Priority to send Results to Loki (info < warning < error)") + + flag.Parse() + + return cmd +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 08ed70a1..7e379fe3 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -7,6 +7,13 @@ type Config struct { SkipExisting bool `mapstructure:"skipExistingOnStartup"` MinimumPriority string `mapstructure:"minimumPriority"` } `mapstructure:"loki"` + Elasticsearch struct { + Host string `mapstructure:"host"` + Index string `mapstructure:"index"` + Rotation string `mapstructure:"rotation"` + SkipExisting bool `mapstructure:"skipExistingOnStartup"` + MinimumPriority string `mapstructure:"minimumPriority"` + } `mapstructure:"elasticsearch"` Kubeconfig string `mapstructure:"kubeconfig"` Namespace string `mapstructure:"namespace"` } diff --git a/pkg/config/resolver.go b/pkg/config/resolver.go index 2220d3e1..6921e071 100644 --- a/pkg/config/resolver.go +++ b/pkg/config/resolver.go @@ -9,25 +9,24 @@ import ( "github.com/fjogeleit/policy-reporter/pkg/metrics" "github.com/fjogeleit/policy-reporter/pkg/report" "github.com/fjogeleit/policy-reporter/pkg/target" + "github.com/fjogeleit/policy-reporter/pkg/target/elasticsearch" "github.com/fjogeleit/policy-reporter/pkg/target/loki" ) -var ( - kubeClient report.Client - lokiClient target.Client - policyReportMetrics metrics.Metrics - clusterPolicyReportMetrics metrics.Metrics -) - // Resolver manages dependencies type Resolver struct { - config *Config + config *Config + kubeClient report.Client + lokiClient target.Client + elasticsearchClient target.Client + policyReportMetrics metrics.Metrics + clusterPolicyReportMetrics metrics.Metrics } // PolicyReportClient resolver method func (r *Resolver) PolicyReportClient() (report.Client, error) { - if kubeClient != nil { - return kubeClient, nil + if r.kubeClient != nil { + return r.kubeClient, nil } client, err := kubernetes.NewPolicyReportClient( @@ -37,34 +36,63 @@ func (r *Resolver) PolicyReportClient() (report.Client, error) { time.Now(), ) - kubeClient = client + r.kubeClient = client return client, err } // LokiClient resolver method func (r *Resolver) LokiClient() target.Client { - if lokiClient != nil { - return lokiClient + if r.lokiClient != nil { + return r.lokiClient } if r.config.Loki.Host == "" { return nil } - lokiClient = loki.NewClient( + r.lokiClient = loki.NewClient( r.config.Loki.Host, r.config.Loki.MinimumPriority, + r.config.Loki.SkipExisting, &http.Client{}, ) - return lokiClient + return r.lokiClient +} + +// ElasticsearchClient resolver method +func (r *Resolver) ElasticsearchClient() target.Client { + if r.elasticsearchClient != nil { + return r.elasticsearchClient + } + + if r.config.Elasticsearch.Host == "" { + return nil + } + if r.config.Elasticsearch.Index == "" { + r.config.Elasticsearch.Index = "policy-reporter" + } + if r.config.Elasticsearch.Rotation == "" { + r.config.Elasticsearch.Rotation = elasticsearch.Dayli + } + + r.elasticsearchClient = elasticsearch.NewClient( + r.config.Elasticsearch.Host, + r.config.Elasticsearch.Index, + r.config.Elasticsearch.Rotation, + r.config.Elasticsearch.MinimumPriority, + r.config.Elasticsearch.SkipExisting, + &http.Client{}, + ) + + return r.elasticsearchClient } // PolicyReportMetrics resolver method func (r *Resolver) PolicyReportMetrics() (metrics.Metrics, error) { - if policyReportMetrics != nil { - return policyReportMetrics, nil + if r.policyReportMetrics != nil { + return r.policyReportMetrics, nil } client, err := r.PolicyReportClient() @@ -72,15 +100,15 @@ func (r *Resolver) PolicyReportMetrics() (metrics.Metrics, error) { return nil, err } - policyReportMetrics = metrics.NewPolicyReportMetrics(client) + r.policyReportMetrics = metrics.NewPolicyReportMetrics(client) - return policyReportMetrics, nil + return r.policyReportMetrics, nil } // ClusterPolicyReportMetrics resolver method func (r *Resolver) ClusterPolicyReportMetrics() (metrics.Metrics, error) { - if clusterPolicyReportMetrics != nil { - return clusterPolicyReportMetrics, nil + if r.clusterPolicyReportMetrics != nil { + return r.clusterPolicyReportMetrics, nil } client, err := r.PolicyReportClient() @@ -88,20 +116,47 @@ func (r *Resolver) ClusterPolicyReportMetrics() (metrics.Metrics, error) { return nil, err } - clusterPolicyReportMetrics = metrics.NewClusterPolicyMetrics(client) + r.clusterPolicyReportMetrics = metrics.NewClusterPolicyMetrics(client) - return clusterPolicyReportMetrics, nil + return r.clusterPolicyReportMetrics, nil +} + +func (r *Resolver) TargetClients() []target.Client { + clients := make([]target.Client, 0) + + if loki := r.LokiClient(); loki != nil { + clients = append(clients, loki) + } + + if elasticsearch := r.ElasticsearchClient(); elasticsearch != nil { + clients = append(clients, elasticsearch) + } + + return clients +} + +func (r *Resolver) SkipExistingOnStartup() bool { + for _, client := range r.TargetClients() { + if !client.SkipExistingOnStartup() { + return false + } + } + + return true } // Reset all cached dependencies func (r *Resolver) Reset() { - kubeClient = nil - lokiClient = nil - policyReportMetrics = nil - clusterPolicyReportMetrics = nil + r.kubeClient = nil + r.lokiClient = nil + r.elasticsearchClient = nil + r.policyReportMetrics = nil + r.clusterPolicyReportMetrics = nil } // NewResolver constructor function func NewResolver(config *Config) Resolver { - return Resolver{config} + return Resolver{ + config: config, + } } diff --git a/pkg/config/resolver_test.go b/pkg/config/resolver_test.go index e16bb5ef..c547aa5b 100644 --- a/pkg/config/resolver_test.go +++ b/pkg/config/resolver_test.go @@ -16,6 +16,19 @@ var testConfig = &config.Config{ SkipExisting: true, MinimumPriority: "debug", }, + Elasticsearch: struct { + Host string "mapstructure:\"host\"" + Index string "mapstructure:\"index\"" + Rotation string "mapstructure:\"rotation\"" + SkipExisting bool "mapstructure:\"skipExistingOnStartup\"" + MinimumPriority string "mapstructure:\"minimumPriority\"" + }{ + Host: "http://localhost:9200", + Index: "policy-reporter", + Rotation: "dayli", + SkipExisting: true, + MinimumPriority: "debug", + }, } func Test_ResolveLokiClient(t *testing.T) { @@ -32,6 +45,76 @@ func Test_ResolveLokiClient(t *testing.T) { } } +func Test_ResolveElasticSearchClient(t *testing.T) { + resolver := config.NewResolver(testConfig) + + client := resolver.ElasticsearchClient() + if client == nil { + t.Error("Expected Client, got nil") + } + + client2 := resolver.ElasticsearchClient() + if client != client2 { + t.Error("Error: Should reuse first instance") + } +} + +func Test_ResolveTargets(t *testing.T) { + resolver := config.NewResolver(testConfig) + + clients := resolver.TargetClients() + if count := len(clients); count != 2 { + t.Errorf("Expected 2 Clients, got %d", count) + } +} + +func Test_ResolveSkipExistingOnStartup(t *testing.T) { + var testConfig = &config.Config{ + Loki: struct { + Host string "mapstructure:\"host\"" + SkipExisting bool "mapstructure:\"skipExistingOnStartup\"" + MinimumPriority string "mapstructure:\"minimumPriority\"" + }{ + Host: "http://localhost:3100", + SkipExisting: true, + MinimumPriority: "debug", + }, + Elasticsearch: struct { + Host string "mapstructure:\"host\"" + Index string "mapstructure:\"index\"" + Rotation string "mapstructure:\"rotation\"" + SkipExisting bool "mapstructure:\"skipExistingOnStartup\"" + MinimumPriority string "mapstructure:\"minimumPriority\"" + }{ + Host: "http://localhost:9200", + Index: "policy-reporter", + Rotation: "dayli", + SkipExisting: true, + MinimumPriority: "debug", + }, + } + + t.Run("Resolve false", func(t *testing.T) { + testConfig.Elasticsearch.SkipExisting = false + + resolver := config.NewResolver(testConfig) + + if resolver.SkipExistingOnStartup() == true { + t.Error("Expected SkipExistingOnStartup to be false if one Client has SkipExistingOnStartup false configured") + } + }) + + t.Run("Resolve true", func(t *testing.T) { + testConfig.Elasticsearch.SkipExisting = true + + resolver := config.NewResolver(testConfig) + + if resolver.SkipExistingOnStartup() == false { + t.Error("Expected SkipExistingOnStartup to be true if all Client has SkipExistingOnStartup true configured") + } + }) +} + func Test_ResolveLokiClientWithoutHost(t *testing.T) { config2 := &config.Config{ Loki: struct { @@ -52,3 +135,27 @@ func Test_ResolveLokiClientWithoutHost(t *testing.T) { t.Error("Expected Client to be nil if no host is configured") } } + +func Test_ResolveElasticsearchClientWithoutHost(t *testing.T) { + config2 := &config.Config{ + Elasticsearch: struct { + Host string "mapstructure:\"host\"" + Index string "mapstructure:\"index\"" + Rotation string "mapstructure:\"rotation\"" + SkipExisting bool "mapstructure:\"skipExistingOnStartup\"" + MinimumPriority string "mapstructure:\"minimumPriority\"" + }{ + Host: "", + Index: "policy-reporter", + Rotation: "dayli", + SkipExisting: true, + MinimumPriority: "debug", + }, + } + + resolver := config.NewResolver(config2) + + if resolver.ElasticsearchClient() != nil { + t.Error("Expected Client to be nil if no host is configured") + } +} diff --git a/pkg/kubernetes/report_client.go b/pkg/kubernetes/report_client.go index 4f29f3b5..50a09be2 100644 --- a/pkg/kubernetes/report_client.go +++ b/pkg/kubernetes/report_client.go @@ -3,6 +3,7 @@ package kubernetes import ( "context" "log" + "sync" "time" "github.com/fjogeleit/policy-reporter/pkg/report" @@ -51,6 +52,65 @@ func (c *policyReportClient) FetchPolicyReports() ([]report.PolicyReport, error) return reports, nil } +func (c *policyReportClient) FetchClusterPolicyReports() ([]report.ClusterPolicyReport, error) { + var reports []report.ClusterPolicyReport + + result, err := c.client.Resource(clusterPolicyReports).List(context.Background(), metav1.ListOptions{}) + if err != nil { + log.Printf("K8s List Error: %s\n", err.Error()) + return reports, err + } + + for _, item := range result.Items { + reports = append(reports, c.mapper.MapClusterPolicyReport(item.Object)) + } + + return reports, nil +} + +func (c *policyReportClient) FetchPolicyReportResults() ([]report.Result, error) { + g := new(errgroup.Group) + mx := new(sync.Mutex) + + var results []report.Result + + g.Go(func() error { + reports, err := c.FetchClusterPolicyReports() + if err != nil { + return err + } + + for _, clusterReport := range reports { + for _, result := range clusterReport.Results { + mx.Lock() + results = append(results, result) + mx.Unlock() + } + } + + return nil + }) + + g.Go(func() error { + reports, err := c.FetchPolicyReports() + if err != nil { + return err + } + + for _, clusterReport := range reports { + for _, result := range clusterReport.Results { + mx.Lock() + results = append(results, result) + mx.Unlock() + } + } + + return nil + }) + + return results, g.Wait() +} + func (c *policyReportClient) WatchClusterPolicyReports(cb report.WatchClusterPolicyReportCallback) error { for { result, err := c.client.Resource(clusterPolicyReports).Watch(context.Background(), metav1.ListOptions{}) @@ -81,27 +141,29 @@ func (c *policyReportClient) WatchPolicyReports(cb report.WatchPolicyReportCallb } } -func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCallback, skipExisting bool) error { +func (c *policyReportClient) WatchPolicyReportResults(cb report.WatchPolicyResultCallback, skipExisting bool) error { wg := new(errgroup.Group) wg.Go(func() error { return c.WatchPolicyReports(func(e watch.EventType, pr report.PolicyReport) { switch e { case watch.Added: - if skipExisting && pr.CreationTimestamp.Before(c.startUp) { + preExisted := pr.CreationTimestamp.Before(c.startUp) + + if skipExisting && preExisted { c.policyCache[pr.GetIdentifier()] = pr break } for _, result := range pr.Results { - cb(result) + cb(result, preExisted) } c.policyCache[pr.GetIdentifier()] = pr case watch.Modified: diff := pr.GetNewResults(c.policyCache[pr.GetIdentifier()]) for _, result := range diff { - cb(result) + cb(result, false) } c.policyCache[pr.GetIdentifier()] = pr @@ -115,20 +177,22 @@ func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCall return c.WatchClusterPolicyReports(func(s watch.EventType, cpr report.ClusterPolicyReport) { switch s { case watch.Added: - if skipExisting && cpr.CreationTimestamp.Before(c.startUp) { + preExisted := cpr.CreationTimestamp.Before(c.startUp) + + if skipExisting && preExisted { c.clusterPolicyCache[cpr.GetIdentifier()] = cpr break } for _, result := range cpr.Results { - cb(result) + cb(result, preExisted) } c.clusterPolicyCache[cpr.GetIdentifier()] = cpr case watch.Modified: diff := cpr.GetNewResults(c.clusterPolicyCache[cpr.GetIdentifier()]) for _, result := range diff { - cb(result) + cb(result, false) } c.clusterPolicyCache[cpr.GetIdentifier()] = cpr diff --git a/pkg/report/client.go b/pkg/report/client.go index 970986be..562bec0b 100644 --- a/pkg/report/client.go +++ b/pkg/report/client.go @@ -9,7 +9,7 @@ type WatchPolicyReportCallback = func(watch.EventType, PolicyReport) type WatchClusterPolicyReportCallback = func(watch.EventType, ClusterPolicyReport) // WatchPolicyResultCallback is called whenver a new PolicyResult comes in -type WatchPolicyResultCallback = func(Result) +type WatchPolicyResultCallback = func(Result, bool) // Client interface for interacting with the Kubernetes API type Client interface { @@ -17,8 +17,12 @@ type Client interface { FetchPolicyReports() ([]PolicyReport, error) // WatchPolicyReports blocking API to watch for PolicyReport changes WatchPolicyReports(WatchPolicyReportCallback) error - // WatchRuleValidation blocking API to watch for PolicyResult changes from PolicyReports and ClusterPolicyReports - WatchRuleValidation(WatchPolicyResultCallback, bool) error + // WatchPolicyReportResults blocking API to watch for PolicyResult changes from PolicyReports and ClusterPolicyReports + WatchPolicyReportResults(WatchPolicyResultCallback, bool) error + // FetchPolicyReportResults from the unterlying API + FetchPolicyReportResults() ([]Result, error) // WatchClusterPolicyReports blocking API to watch for ClusterPolicyReport changes WatchClusterPolicyReports(WatchClusterPolicyReportCallback) error + // FetchClusterPolicyReport from the unterlying API + FetchClusterPolicyReports() ([]ClusterPolicyReport, error) } diff --git a/pkg/report/model.go b/pkg/report/model.go index fc470d6f..1f3c2305 100644 --- a/pkg/report/model.go +++ b/pkg/report/model.go @@ -1,6 +1,7 @@ package report import ( + "bytes" "fmt" "time" ) @@ -58,6 +59,15 @@ func (p Priority) String() string { } } +// MarshalJSON marshals the enum as a quoted json string +func (p Priority) MarshalJSON() ([]byte, error) { + buffer := bytes.NewBufferString(`"`) + buffer.WriteString(p.String()) + buffer.WriteString(`"`) + + return buffer.Bytes(), nil +} + // PriorityFromStatus creates a Priority based on a Status func PriorityFromStatus(p Status) Priority { switch p { @@ -106,8 +116,8 @@ type Result struct { Rule string Priority Priority Status Status - Severity Severity - Category string + Severity Severity `json:",omitempty"` + Category string `json:",omitempty"` Scored bool Resources []Resource } diff --git a/pkg/target/client.go b/pkg/target/client.go index 803c71c8..7451f1cd 100644 --- a/pkg/target/client.go +++ b/pkg/target/client.go @@ -8,4 +8,6 @@ import ( type Client interface { // Send the given Result to the configured Target Send(result report.Result) + // SkipExistingOnStartup skips already existing PolicyReportResults on startup + SkipExistingOnStartup() bool } diff --git a/pkg/target/elasticsearch/elasticsearch.go b/pkg/target/elasticsearch/elasticsearch.go new file mode 100644 index 00000000..a492160b --- /dev/null +++ b/pkg/target/elasticsearch/elasticsearch.go @@ -0,0 +1,103 @@ +package elasticsearch + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "net/http" + "time" + + "github.com/fjogeleit/policy-reporter/pkg/report" + "github.com/fjogeleit/policy-reporter/pkg/target" +) + +type Rotation = string + +// Elasticsearch Index Rotation +const ( + None Rotation = "none" + Dayli Rotation = "dayli" + Monthly Rotation = "monthly" + Annually Rotation = "annually" +) + +type httpClient interface { + Do(req *http.Request) (*http.Response, error) +} + +type client struct { + host string + index string + rotation Rotation + minimumPriority string + skipExistingOnStartup bool + client httpClient +} + +func (e *client) Send(result report.Result) { + if result.Priority < report.NewPriority(e.minimumPriority) { + return + } + + body := new(bytes.Buffer) + + if err := json.NewEncoder(body).Encode(result); err != nil { + log.Printf("[ERROR] : %v\n", err.Error()) + } + + var host string + switch e.rotation { + case None: + host = e.host + "/" + e.index + "/event" + case Annually: + host = e.host + "/" + e.index + "-" + time.Now().Format("2006") + "/event" + case Monthly: + host = e.host + "/" + e.index + "-" + time.Now().Format("2006.01") + "/event" + default: + host = e.host + "/" + e.index + "-" + time.Now().Format("2006.01.02") + "/event" + } + + req, err := http.NewRequest("POST", host, body) + if err != nil { + log.Printf("[ERROR] : %v\n", err.Error()) + } + + req.Header.Add("Content-Type", "application/json; charset=utf-8") + req.Header.Add("User-Agent", "Policy-Reporter") + + resp, err := e.client.Do(req) + defer func() { + if resp != nil && resp.Body != nil { + resp.Body.Close() + } + }() + + if err != nil { + log.Printf("[ERROR] PUSH failed: %s\n", err.Error()) + } else if resp.StatusCode > 400 { + fmt.Printf("StatusCode: %d\n", resp.StatusCode) + buf := new(bytes.Buffer) + buf.ReadFrom(resp.Body) + + log.Printf("[ERROR] PUSH failed [%d]: %s\n", resp.StatusCode, buf.String()) + } else { + log.Println("[INFO] PUSH OK") + } +} + +func (e *client) SkipExistingOnStartup() bool { + return e.skipExistingOnStartup +} + +// NewClient creates a new loki.client to send Results to Loki +func NewClient(host, index, rotation, minimumPriority string, skipExistingOnStartup bool, httpClient httpClient) target.Client { + return &client{ + host, + index, + Rotation(rotation), + minimumPriority, + skipExistingOnStartup, + httpClient, + } +} diff --git a/pkg/target/elasticsearch/elasticsearch_test.go b/pkg/target/elasticsearch/elasticsearch_test.go new file mode 100644 index 00000000..7ed795f8 --- /dev/null +++ b/pkg/target/elasticsearch/elasticsearch_test.go @@ -0,0 +1,94 @@ +package elasticsearch_test + +import ( + "net/http" + "testing" + "time" + + "github.com/fjogeleit/policy-reporter/pkg/report" + "github.com/fjogeleit/policy-reporter/pkg/target/elasticsearch" +) + +var completeResult = report.Result{ + Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/", + Policy: "require-requests-and-limits-required", + Rule: "autogen-check-for-requests-and-limits", + Priority: report.ErrorPriority, + Status: report.Fail, + Severity: report.Heigh, + Category: "resources", + Scored: true, + Resources: []report.Resource{ + { + APIVersion: "v1", + Kind: "Deployment", + Name: "nginx", + Namespace: "default", + UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409", + }, + }, +} + +type testClient struct { + callback func(req *http.Request) + statusCode int +} + +func (c testClient) Do(req *http.Request) (*http.Response, error) { + c.callback(req) + + return &http.Response{ + StatusCode: c.statusCode, + }, nil +} + +func Test_ElasticsearchTarget(t *testing.T) { + t.Run("Send with Annually Result", func(t *testing.T) { + callback := func(req *http.Request) { + if contentType := req.Header.Get("Content-Type"); contentType != "application/json; charset=utf-8" { + t.Errorf("Unexpected Content-Type: %s", contentType) + } + + if agend := req.Header.Get("User-Agent"); agend != "Policy-Reporter" { + t.Errorf("Unexpected Host: %s", agend) + } + + if url := req.URL.String(); url != "http://localhost:9200/policy-reporter-"+time.Now().Format("2006")+"/event" { + t.Errorf("Unexpected Host: %s", url) + } + } + + loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "annually", "", false, testClient{callback, 200}) + loki.Send(completeResult) + }) + t.Run("Send with Monthly Result", func(t *testing.T) { + callback := func(req *http.Request) { + if url := req.URL.String(); url != "http://localhost:9200/policy-reporter-"+time.Now().Format("2006.01")+"/event" { + t.Errorf("Unexpected Host: %s", url) + } + } + + loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "monthly", "", false, testClient{callback, 200}) + loki.Send(completeResult) + }) + t.Run("Send with Monthly Result", func(t *testing.T) { + callback := func(req *http.Request) { + if url := req.URL.String(); url != "http://localhost:9200/policy-reporter-"+time.Now().Format("2006.01.02")+"/event" { + t.Errorf("Unexpected Host: %s", url) + } + } + + loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "daily", "", false, testClient{callback, 200}) + loki.Send(completeResult) + }) + t.Run("Send with None Result", func(t *testing.T) { + callback := func(req *http.Request) { + if url := req.URL.String(); url != "http://localhost:9200/policy-reporter/event" { + t.Errorf("Unexpected Host: %s", url) + } + } + + loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "none", "", false, testClient{callback, 200}) + loki.Send(completeResult) + }) +} diff --git a/pkg/target/loki/loki.go b/pkg/target/loki/loki.go index f9df5c59..bd0e8112 100644 --- a/pkg/target/loki/loki.go +++ b/pkg/target/loki/loki.go @@ -70,9 +70,10 @@ func newLokiPayload(result report.Result) payload { } type client struct { - host string - minimumPriority string - client httpClient + host string + minimumPriority string + skipExistingOnStartup bool + client httpClient } func (l *client) Send(result report.Result) { @@ -93,7 +94,7 @@ func (l *client) Send(result report.Result) { } req.Header.Add("Content-Type", "application/json") - req.Header.Add("User-Agent", "Policy-API") + req.Header.Add("User-Agent", "Policy-Reporter") resp, err := l.client.Do(req) defer func() { @@ -115,11 +116,16 @@ func (l *client) Send(result report.Result) { } } +func (l *client) SkipExistingOnStartup() bool { + return l.skipExistingOnStartup +} + // NewClient creates a new loki.client to send Results to Loki -func NewClient(host, minimumPriority string, httpClient httpClient) target.Client { +func NewClient(host, minimumPriority string, skipExistingOnStartup bool, httpClient httpClient) target.Client { return &client{ host + "/api/prom/push", minimumPriority, + skipExistingOnStartup, httpClient, } } diff --git a/pkg/target/loki/loki_test.go b/pkg/target/loki/loki_test.go index c21e80b9..886e24bc 100644 --- a/pkg/target/loki/loki_test.go +++ b/pkg/target/loki/loki_test.go @@ -59,7 +59,7 @@ func Test_LokiTarget(t *testing.T) { t.Errorf("Unexpected Content-Type: %s", contentType) } - if agend := req.Header.Get("User-Agent"); agend != "Policy-API" { + if agend := req.Header.Get("User-Agent"); agend != "Policy-Reporter" { t.Errorf("Unexpected Host: %s", agend) } @@ -109,7 +109,7 @@ func Test_LokiTarget(t *testing.T) { } } - loki := loki.NewClient("http://localhost:3100", "", testClient{callback, 200}) + loki := loki.NewClient("http://localhost:3100", "", false, testClient{callback, 200}) loki.Send(completeResult) }) @@ -119,7 +119,7 @@ func Test_LokiTarget(t *testing.T) { t.Errorf("Unexpected Content-Type: %s", contentType) } - if agend := req.Header.Get("User-Agent"); agend != "Policy-API" { + if agend := req.Header.Get("User-Agent"); agend != "Policy-Reporter" { t.Errorf("Unexpected Host: %s", agend) } @@ -167,7 +167,7 @@ func Test_LokiTarget(t *testing.T) { } } - loki := loki.NewClient("http://localhost:3100", "", testClient{callback, 200}) + loki := loki.NewClient("http://localhost:3100", "", false, testClient{callback, 200}) loki.Send(minimalResult) }) }