1
0
Fork 0
mirror of https://github.com/kyverno/policy-reporter.git synced 2024-12-14 11:57:32 +00:00

Development (#7)

* Implement elasticsearch
* Update deployment
* Add Changelog
This commit is contained in:
Frank Jogeleit 2021-02-27 19:11:49 +01:00 committed by GitHub
parent eab3ff0d41
commit 2872a259ec
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
20 changed files with 771 additions and 129 deletions

7
CHANGELOG.md Normal file
View file

@ -0,0 +1,7 @@
# Changelog
## 0.7.0
* Implement Elasticsearch as Target for PolicyReportResults
* Replace CLI flags with a single `config.yaml` to manage target-configurations as separate `ConfigMap`
* Set `loki.skipExistingOnStartup` default value to `true`

View file

@ -3,7 +3,7 @@
## Motivation
Kyverno ships with two types of validation. You can either enforce a rule or audit it. If you don't want to block developers or if you want to try out a new rule, you can use the audit functionality. The audit configuration creates [PolicyReports](https://kyverno.io/docs/policy-reports/) which you can access with `kubectl`. Because I can't find a simple solution to get a general overview of this PolicyReports and PolicyReportResults, I created this tool to send information from PolicyReports to [Grafana Loki](https://grafana.com/oss/loki/). As additional feature this tool provides an http server with Prometheus Metrics about ReportPolicy Summaries and ReportPolicyRules.
Kyverno ships with two types of validation. You can either enforce a rule or audit it. If you don't want to block developers or if you want to try out a new rule, you can use the audit functionality. The audit configuration creates [PolicyReports](https://kyverno.io/docs/policy-reports/) which you can access with `kubectl`. Because I can't find a simple solution to get a general overview of this PolicyReports and PolicyReportResults, I created this tool to send information from PolicyReports to different targets like [Grafana Loki](https://grafana.com/oss/loki/). This tool provides by default an HTTP server with Prometheus Metrics on `http://localhost:2112/metrics` about ReportPolicy Summaries and ReportPolicyRules.
This project is in an early stage. Please let me know if anything did not work as expected or if you want to send your audits to other targets then Loki.
@ -11,21 +11,56 @@ This project is in an early stage. Please let me know if anything did not work a
Installation via Helm Repository
### Add the Helm repository
```bash
helm repo add policy-reporter https://fjogeleit.github.io/policy-reporter
helm install policy-reporter policy-reporter/policy-reporter --set loki.host=http://lokihost:3100 -n policy-reporter --create-namespace
```
### Basic Installation - Provides Prometheus Metrics
```bash
helm install policy-reporter policy-reporter/policy-reporter -n policy-reporter --create-namespace
```
### Installation with Loki
```bash
helm install policy-reporter policy-reporter/policy-reporter --set loki.host=http://loki:3100 -n policy-reporter --create-namespace
```
### Installation with Elasticsearch
```bash
helm install policy-reporter policy-reporter/policy-reporter --set elasticsearch.host=http://elasticsearch:3100 -n policy-reporter --create-namespace
```
You can also customize the `./charts/policy-reporter/values.yaml` to change the default configurations.
### Additional configurations for Loki
Configure `loki.minimumPriority` to send only results with the configured minimumPriority or above, empty means all results. (info < warning < error)
Configure `loki.skipExistingOnStartup` to skip all results who already existed before the PolicyReporter started. Can be used after the first deployment to prevent duplicated events.
* Configure `loki.minimumPriority` to send only results with the configured minimumPriority or above, empty means all results. (info < warning < error)
* Configure `loki.skipExistingOnStartup` to skip all results who already existed before the PolicyReporter started (default: `true`).
```yaml
loki:
minimumPriority: ""
skipExistingOnStartup: false
skipExistingOnStartup: true
```
### Additional configurations for Elasticsearch
* Configure `elasticsearch.index` to customize the elasticsearch index.
* Configure `elasticsearch.rotation` is added as suffix to the index. Possible values are `daily`, `monthly`, `annually` and `none`.
* Configure `elasticsearch.minimumPriority` to send only results with the configured minimumPriority or above, empty means all results. (info < warning < error)
* Configure `elasticsearch.skipExistingOnStartup` to skip all results who already existed before the PolicyReporter started (default: `true`).
```yaml
elasticsearch:
index: "policy-reporter"
rotation: "daily"
minimumPriority: ""
skipExistingOnStartup: true
```
### Configure Policy Priorities

View file

@ -3,5 +3,5 @@ name: policy-reporter
description: K8s PolicyReporter watches for wgpolicyk8s.io/v1alpha1.PolicyReport resources. It creates Prometheus Metrics and can send rule validation events to Loki
type: application
version: 0.7.2
appVersion: 0.6.0
version: 0.8.0
appVersion: 0.7.0

View file

@ -25,13 +25,7 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- --loki={{ .Values.loki.host }}
{{- if .Values.loki.minimumPriority }}
- --loki-minimum-priority={{ .Values.loki.minimumPriority }}
{{- end }}
{{- if .Values.loki.skipExistingOnStartup }}
- --loki-skip-existing-on-startup
{{- end }}
- --config=/app/config.yaml
ports:
- name: http
containerPort: 2112
@ -56,5 +50,5 @@ spec:
volumes:
- name: config-volume
configMap:
name: policy-reporter-config
name: {{ include "policyreporter.fullname" . }}-targets
optional: true

View file

@ -0,0 +1,19 @@
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "policyreporter.fullname" . }}-targets
labels:
{{- include "policyreporter.labels" . | nindent 4 }}
data:
config.yaml: |-
loki:
host: {{ .Values.loki.host | quote }}
minimumPriority: {{ .Values.loki.minimumPriority | quote }}
skipExistingOnStartup: {{ .Values.loki.skipExistingOnStartup }}
elasticsearch:
host: {{ .Values.elasticsearch.host | quote }}
index: {{ .Values.elasticsearch.index | default "policy-reporter" | quote }}
rotation: {{ .Values.elasticsearch.rotation | default "dayli" | quote }}
minimumPriority: {{ .Values.elasticsearch.minimumPriority | quote }}
skipExistingOnStartup: {{ .Values.elasticsearch.skipExistingOnStartup }}

View file

@ -1,10 +1,24 @@
loki:
# loki host address
host: http://loki.loki-stack.svc.cluster.local:3100
host: ""
# minimum priority "" < info < warning < error
minimumPriority: ""
# Skip already existing PolicyReportResults on startup
skipExistingOnStartup: false
skipExistingOnStartup: true
elasticsearch:
# elasticsearch host address
host: ""
# elasticsearch index (default: policy-reporter)
index: ""
# elasticsearch index rotation and index suffix
# possible values: dayli, monthly, annually, none (default: dayli)
rotation: ""
# minimum priority "" < info < warning < error
minimumPriority: ""
# Skip already existing PolicyReportResults on startup
skipExistingOnStartup: true
metrics:
serviceMonitor: false
@ -15,7 +29,7 @@ metrics:
image:
repository: fjogeleit/policy-reporter
pullPolicy: IfNotPresent
tag: 0.6.0
tag: 0.7.0
imagePullSecrets: []

View file

@ -1,79 +1,24 @@
package cmd
import (
"flag"
"net/http"
"log"
"github.com/fjogeleit/policy-reporter/pkg/config"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)
// NewCLI creates a new instance of the root CLI
func NewCLI() *cobra.Command {
rootCmd := &cobra.Command{
Use: "run",
Short: "Kyverno Policy API",
Long: `Kyverno Policy API and Monitoring`,
RunE: func(cmd *cobra.Command, args []string) error {
c, err := loadConfig(cmd)
if err != nil {
return err
}
resolver := config.NewResolver(c)
client, err := resolver.PolicyReportClient()
if err != nil {
return err
}
policyMetrics, err := resolver.PolicyReportMetrics()
if err != nil {
return err
}
clusterPolicyMetrics, err := resolver.ClusterPolicyReportMetrics()
if err != nil {
return err
}
loki := resolver.LokiClient()
g := new(errgroup.Group)
g.Go(policyMetrics.GenerateMetrics)
g.Go(clusterPolicyMetrics.GenerateMetrics)
if loki != nil {
g.Go(func() error {
return client.WatchRuleValidation(func(r report.Result) {
go loki.Send(r)
}, c.Loki.SkipExisting)
})
}
g.Go(func() error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(":2112", nil)
})
return g.Wait()
},
Use: "policyreporter",
Short: "Generates PolicyReport Metrics and Send Results to different targets",
Long: `Generates Prometheus Metrics from PolicyReports, ClusterPolicyReports and PolicyReportResults.
Sends notifications to different targets like Grafana's Loki.`,
}
rootCmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file")
rootCmd.PersistentFlags().String("loki", "", "loki host: http://loki:3100")
rootCmd.PersistentFlags().String("loki-minimum-priority", "", "Minimum Priority to send Results to Loki (info < warning < error)")
rootCmd.PersistentFlags().Bool("loki-skip-existing-on-startup", false, "Skip Results created before PolicyReporter started. Prevent duplicated sending after new deployment")
flag.Parse()
rootCmd.AddCommand(newRunCMD())
rootCmd.AddCommand(newSendCMD())
return rootCmd
}
@ -83,8 +28,26 @@ func loadConfig(cmd *cobra.Command) (*config.Config, error) {
v.SetDefault("namespace", "policy-reporter")
cfgFile := ""
configFlag := cmd.Flags().Lookup("config")
if configFlag != nil {
cfgFile = configFlag.Value.String()
}
if cfgFile != "" {
v.SetConfigFile(cfgFile)
} else {
v.AddConfigPath(".")
v.SetConfigName("config")
}
v.AutomaticEnv()
if err := v.ReadInConfig(); err != nil {
log.Println("[INFO] No target configuration file found")
}
if flag := cmd.Flags().Lookup("loki"); flag != nil {
v.BindPFlag("loki.host", flag)
}

89
cmd/run.go Normal file
View file

@ -0,0 +1,89 @@
package cmd
import (
"flag"
"net/http"
"github.com/fjogeleit/policy-reporter/pkg/config"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
)
func newRunCMD() *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Run PolicyReporter Watcher & HTTP Metrics Server",
RunE: func(cmd *cobra.Command, args []string) error {
c, err := loadConfig(cmd)
if err != nil {
return err
}
resolver := config.NewResolver(c)
client, err := resolver.PolicyReportClient()
if err != nil {
return err
}
policyMetrics, err := resolver.PolicyReportMetrics()
if err != nil {
return err
}
clusterPolicyMetrics, err := resolver.ClusterPolicyReportMetrics()
if err != nil {
return err
}
g := new(errgroup.Group)
g.Go(policyMetrics.GenerateMetrics)
g.Go(clusterPolicyMetrics.GenerateMetrics)
g.Go(func() error {
targets := resolver.TargetClients()
if len(targets) == 0 {
return nil
}
return client.WatchPolicyReportResults(func(r report.Result, e bool) {
for _, t := range targets {
go func(target target.Client, result report.Result, preExisted bool) {
if preExisted && target.SkipExistingOnStartup() {
return
}
target.Send(result)
}(t, r, e)
}
}, resolver.SkipExistingOnStartup())
})
g.Go(func() error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(":2112", nil)
})
return g.Wait()
},
}
// For local usage
cmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file")
cmd.PersistentFlags().StringP("config", "c", "", "target configuration file")
cmd.PersistentFlags().String("loki", "", "loki host: http://loki:3100")
cmd.PersistentFlags().String("loki-minimum-priority", "", "Minimum Priority to send Results to Loki (info < warning < error)")
cmd.PersistentFlags().Bool("loki-skip-existing-on-startup", false, "Skip Results created before PolicyReporter started. Prevent duplicated sending after new deployment")
flag.Parse()
return cmd
}

69
cmd/send.go Normal file
View file

@ -0,0 +1,69 @@
package cmd
import (
"flag"
"sync"
"github.com/fjogeleit/policy-reporter/pkg/config"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/spf13/cobra"
)
func newSendCMD() *cobra.Command {
cmd := &cobra.Command{
Use: "send",
Short: "Send all current PolicyReportResults to the configured targets",
RunE: func(cmd *cobra.Command, args []string) error {
c, err := loadConfig(cmd)
if err != nil {
return err
}
resolver := config.NewResolver(c)
client, err := resolver.PolicyReportClient()
if err != nil {
return err
}
clients := resolver.TargetClients()
if len(clients) == 0 {
return nil
}
results, err := client.FetchPolicyReportResults()
if err != nil {
return err
}
wg := sync.WaitGroup{}
wg.Add(len(results) * len(clients))
for _, result := range results {
for _, client := range clients {
go func(c target.Client, r report.Result) {
c.Send(r)
wg.Done()
}(client, result)
}
}
wg.Wait()
return err
},
}
// For local usage
cmd.PersistentFlags().StringP("kubeconfig", "k", "", "absolute path to the kubeconfig file")
cmd.PersistentFlags().StringP("config", "c", "", "target configuration file")
cmd.PersistentFlags().String("loki", "", "loki host: http://loki:3100")
cmd.PersistentFlags().String("loki-minimum-priority", "", "Minimum Priority to send Results to Loki (info < warning < error)")
flag.Parse()
return cmd
}

View file

@ -7,6 +7,13 @@ type Config struct {
SkipExisting bool `mapstructure:"skipExistingOnStartup"`
MinimumPriority string `mapstructure:"minimumPriority"`
} `mapstructure:"loki"`
Elasticsearch struct {
Host string `mapstructure:"host"`
Index string `mapstructure:"index"`
Rotation string `mapstructure:"rotation"`
SkipExisting bool `mapstructure:"skipExistingOnStartup"`
MinimumPriority string `mapstructure:"minimumPriority"`
} `mapstructure:"elasticsearch"`
Kubeconfig string `mapstructure:"kubeconfig"`
Namespace string `mapstructure:"namespace"`
}

View file

@ -9,25 +9,24 @@ import (
"github.com/fjogeleit/policy-reporter/pkg/metrics"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/fjogeleit/policy-reporter/pkg/target/elasticsearch"
"github.com/fjogeleit/policy-reporter/pkg/target/loki"
)
var (
kubeClient report.Client
lokiClient target.Client
policyReportMetrics metrics.Metrics
clusterPolicyReportMetrics metrics.Metrics
)
// Resolver manages dependencies
type Resolver struct {
config *Config
config *Config
kubeClient report.Client
lokiClient target.Client
elasticsearchClient target.Client
policyReportMetrics metrics.Metrics
clusterPolicyReportMetrics metrics.Metrics
}
// PolicyReportClient resolver method
func (r *Resolver) PolicyReportClient() (report.Client, error) {
if kubeClient != nil {
return kubeClient, nil
if r.kubeClient != nil {
return r.kubeClient, nil
}
client, err := kubernetes.NewPolicyReportClient(
@ -37,34 +36,63 @@ func (r *Resolver) PolicyReportClient() (report.Client, error) {
time.Now(),
)
kubeClient = client
r.kubeClient = client
return client, err
}
// LokiClient resolver method
func (r *Resolver) LokiClient() target.Client {
if lokiClient != nil {
return lokiClient
if r.lokiClient != nil {
return r.lokiClient
}
if r.config.Loki.Host == "" {
return nil
}
lokiClient = loki.NewClient(
r.lokiClient = loki.NewClient(
r.config.Loki.Host,
r.config.Loki.MinimumPriority,
r.config.Loki.SkipExisting,
&http.Client{},
)
return lokiClient
return r.lokiClient
}
// ElasticsearchClient resolver method
func (r *Resolver) ElasticsearchClient() target.Client {
if r.elasticsearchClient != nil {
return r.elasticsearchClient
}
if r.config.Elasticsearch.Host == "" {
return nil
}
if r.config.Elasticsearch.Index == "" {
r.config.Elasticsearch.Index = "policy-reporter"
}
if r.config.Elasticsearch.Rotation == "" {
r.config.Elasticsearch.Rotation = elasticsearch.Dayli
}
r.elasticsearchClient = elasticsearch.NewClient(
r.config.Elasticsearch.Host,
r.config.Elasticsearch.Index,
r.config.Elasticsearch.Rotation,
r.config.Elasticsearch.MinimumPriority,
r.config.Elasticsearch.SkipExisting,
&http.Client{},
)
return r.elasticsearchClient
}
// PolicyReportMetrics resolver method
func (r *Resolver) PolicyReportMetrics() (metrics.Metrics, error) {
if policyReportMetrics != nil {
return policyReportMetrics, nil
if r.policyReportMetrics != nil {
return r.policyReportMetrics, nil
}
client, err := r.PolicyReportClient()
@ -72,15 +100,15 @@ func (r *Resolver) PolicyReportMetrics() (metrics.Metrics, error) {
return nil, err
}
policyReportMetrics = metrics.NewPolicyReportMetrics(client)
r.policyReportMetrics = metrics.NewPolicyReportMetrics(client)
return policyReportMetrics, nil
return r.policyReportMetrics, nil
}
// ClusterPolicyReportMetrics resolver method
func (r *Resolver) ClusterPolicyReportMetrics() (metrics.Metrics, error) {
if clusterPolicyReportMetrics != nil {
return clusterPolicyReportMetrics, nil
if r.clusterPolicyReportMetrics != nil {
return r.clusterPolicyReportMetrics, nil
}
client, err := r.PolicyReportClient()
@ -88,20 +116,47 @@ func (r *Resolver) ClusterPolicyReportMetrics() (metrics.Metrics, error) {
return nil, err
}
clusterPolicyReportMetrics = metrics.NewClusterPolicyMetrics(client)
r.clusterPolicyReportMetrics = metrics.NewClusterPolicyMetrics(client)
return clusterPolicyReportMetrics, nil
return r.clusterPolicyReportMetrics, nil
}
func (r *Resolver) TargetClients() []target.Client {
clients := make([]target.Client, 0)
if loki := r.LokiClient(); loki != nil {
clients = append(clients, loki)
}
if elasticsearch := r.ElasticsearchClient(); elasticsearch != nil {
clients = append(clients, elasticsearch)
}
return clients
}
func (r *Resolver) SkipExistingOnStartup() bool {
for _, client := range r.TargetClients() {
if !client.SkipExistingOnStartup() {
return false
}
}
return true
}
// Reset all cached dependencies
func (r *Resolver) Reset() {
kubeClient = nil
lokiClient = nil
policyReportMetrics = nil
clusterPolicyReportMetrics = nil
r.kubeClient = nil
r.lokiClient = nil
r.elasticsearchClient = nil
r.policyReportMetrics = nil
r.clusterPolicyReportMetrics = nil
}
// NewResolver constructor function
func NewResolver(config *Config) Resolver {
return Resolver{config}
return Resolver{
config: config,
}
}

View file

@ -16,6 +16,19 @@ var testConfig = &config.Config{
SkipExisting: true,
MinimumPriority: "debug",
},
Elasticsearch: struct {
Host string "mapstructure:\"host\""
Index string "mapstructure:\"index\""
Rotation string "mapstructure:\"rotation\""
SkipExisting bool "mapstructure:\"skipExistingOnStartup\""
MinimumPriority string "mapstructure:\"minimumPriority\""
}{
Host: "http://localhost:9200",
Index: "policy-reporter",
Rotation: "dayli",
SkipExisting: true,
MinimumPriority: "debug",
},
}
func Test_ResolveLokiClient(t *testing.T) {
@ -32,6 +45,76 @@ func Test_ResolveLokiClient(t *testing.T) {
}
}
func Test_ResolveElasticSearchClient(t *testing.T) {
resolver := config.NewResolver(testConfig)
client := resolver.ElasticsearchClient()
if client == nil {
t.Error("Expected Client, got nil")
}
client2 := resolver.ElasticsearchClient()
if client != client2 {
t.Error("Error: Should reuse first instance")
}
}
func Test_ResolveTargets(t *testing.T) {
resolver := config.NewResolver(testConfig)
clients := resolver.TargetClients()
if count := len(clients); count != 2 {
t.Errorf("Expected 2 Clients, got %d", count)
}
}
func Test_ResolveSkipExistingOnStartup(t *testing.T) {
var testConfig = &config.Config{
Loki: struct {
Host string "mapstructure:\"host\""
SkipExisting bool "mapstructure:\"skipExistingOnStartup\""
MinimumPriority string "mapstructure:\"minimumPriority\""
}{
Host: "http://localhost:3100",
SkipExisting: true,
MinimumPriority: "debug",
},
Elasticsearch: struct {
Host string "mapstructure:\"host\""
Index string "mapstructure:\"index\""
Rotation string "mapstructure:\"rotation\""
SkipExisting bool "mapstructure:\"skipExistingOnStartup\""
MinimumPriority string "mapstructure:\"minimumPriority\""
}{
Host: "http://localhost:9200",
Index: "policy-reporter",
Rotation: "dayli",
SkipExisting: true,
MinimumPriority: "debug",
},
}
t.Run("Resolve false", func(t *testing.T) {
testConfig.Elasticsearch.SkipExisting = false
resolver := config.NewResolver(testConfig)
if resolver.SkipExistingOnStartup() == true {
t.Error("Expected SkipExistingOnStartup to be false if one Client has SkipExistingOnStartup false configured")
}
})
t.Run("Resolve true", func(t *testing.T) {
testConfig.Elasticsearch.SkipExisting = true
resolver := config.NewResolver(testConfig)
if resolver.SkipExistingOnStartup() == false {
t.Error("Expected SkipExistingOnStartup to be true if all Client has SkipExistingOnStartup true configured")
}
})
}
func Test_ResolveLokiClientWithoutHost(t *testing.T) {
config2 := &config.Config{
Loki: struct {
@ -52,3 +135,27 @@ func Test_ResolveLokiClientWithoutHost(t *testing.T) {
t.Error("Expected Client to be nil if no host is configured")
}
}
func Test_ResolveElasticsearchClientWithoutHost(t *testing.T) {
config2 := &config.Config{
Elasticsearch: struct {
Host string "mapstructure:\"host\""
Index string "mapstructure:\"index\""
Rotation string "mapstructure:\"rotation\""
SkipExisting bool "mapstructure:\"skipExistingOnStartup\""
MinimumPriority string "mapstructure:\"minimumPriority\""
}{
Host: "",
Index: "policy-reporter",
Rotation: "dayli",
SkipExisting: true,
MinimumPriority: "debug",
},
}
resolver := config.NewResolver(config2)
if resolver.ElasticsearchClient() != nil {
t.Error("Expected Client to be nil if no host is configured")
}
}

View file

@ -3,6 +3,7 @@ package kubernetes
import (
"context"
"log"
"sync"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
@ -51,6 +52,65 @@ func (c *policyReportClient) FetchPolicyReports() ([]report.PolicyReport, error)
return reports, nil
}
func (c *policyReportClient) FetchClusterPolicyReports() ([]report.ClusterPolicyReport, error) {
var reports []report.ClusterPolicyReport
result, err := c.client.Resource(clusterPolicyReports).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Printf("K8s List Error: %s\n", err.Error())
return reports, err
}
for _, item := range result.Items {
reports = append(reports, c.mapper.MapClusterPolicyReport(item.Object))
}
return reports, nil
}
func (c *policyReportClient) FetchPolicyReportResults() ([]report.Result, error) {
g := new(errgroup.Group)
mx := new(sync.Mutex)
var results []report.Result
g.Go(func() error {
reports, err := c.FetchClusterPolicyReports()
if err != nil {
return err
}
for _, clusterReport := range reports {
for _, result := range clusterReport.Results {
mx.Lock()
results = append(results, result)
mx.Unlock()
}
}
return nil
})
g.Go(func() error {
reports, err := c.FetchPolicyReports()
if err != nil {
return err
}
for _, clusterReport := range reports {
for _, result := range clusterReport.Results {
mx.Lock()
results = append(results, result)
mx.Unlock()
}
}
return nil
})
return results, g.Wait()
}
func (c *policyReportClient) WatchClusterPolicyReports(cb report.WatchClusterPolicyReportCallback) error {
for {
result, err := c.client.Resource(clusterPolicyReports).Watch(context.Background(), metav1.ListOptions{})
@ -81,27 +141,29 @@ func (c *policyReportClient) WatchPolicyReports(cb report.WatchPolicyReportCallb
}
}
func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCallback, skipExisting bool) error {
func (c *policyReportClient) WatchPolicyReportResults(cb report.WatchPolicyResultCallback, skipExisting bool) error {
wg := new(errgroup.Group)
wg.Go(func() error {
return c.WatchPolicyReports(func(e watch.EventType, pr report.PolicyReport) {
switch e {
case watch.Added:
if skipExisting && pr.CreationTimestamp.Before(c.startUp) {
preExisted := pr.CreationTimestamp.Before(c.startUp)
if skipExisting && preExisted {
c.policyCache[pr.GetIdentifier()] = pr
break
}
for _, result := range pr.Results {
cb(result)
cb(result, preExisted)
}
c.policyCache[pr.GetIdentifier()] = pr
case watch.Modified:
diff := pr.GetNewResults(c.policyCache[pr.GetIdentifier()])
for _, result := range diff {
cb(result)
cb(result, false)
}
c.policyCache[pr.GetIdentifier()] = pr
@ -115,20 +177,22 @@ func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCall
return c.WatchClusterPolicyReports(func(s watch.EventType, cpr report.ClusterPolicyReport) {
switch s {
case watch.Added:
if skipExisting && cpr.CreationTimestamp.Before(c.startUp) {
preExisted := cpr.CreationTimestamp.Before(c.startUp)
if skipExisting && preExisted {
c.clusterPolicyCache[cpr.GetIdentifier()] = cpr
break
}
for _, result := range cpr.Results {
cb(result)
cb(result, preExisted)
}
c.clusterPolicyCache[cpr.GetIdentifier()] = cpr
case watch.Modified:
diff := cpr.GetNewResults(c.clusterPolicyCache[cpr.GetIdentifier()])
for _, result := range diff {
cb(result)
cb(result, false)
}
c.clusterPolicyCache[cpr.GetIdentifier()] = cpr

View file

@ -9,7 +9,7 @@ type WatchPolicyReportCallback = func(watch.EventType, PolicyReport)
type WatchClusterPolicyReportCallback = func(watch.EventType, ClusterPolicyReport)
// WatchPolicyResultCallback is called whenver a new PolicyResult comes in
type WatchPolicyResultCallback = func(Result)
type WatchPolicyResultCallback = func(Result, bool)
// Client interface for interacting with the Kubernetes API
type Client interface {
@ -17,8 +17,12 @@ type Client interface {
FetchPolicyReports() ([]PolicyReport, error)
// WatchPolicyReports blocking API to watch for PolicyReport changes
WatchPolicyReports(WatchPolicyReportCallback) error
// WatchRuleValidation blocking API to watch for PolicyResult changes from PolicyReports and ClusterPolicyReports
WatchRuleValidation(WatchPolicyResultCallback, bool) error
// WatchPolicyReportResults blocking API to watch for PolicyResult changes from PolicyReports and ClusterPolicyReports
WatchPolicyReportResults(WatchPolicyResultCallback, bool) error
// FetchPolicyReportResults from the unterlying API
FetchPolicyReportResults() ([]Result, error)
// WatchClusterPolicyReports blocking API to watch for ClusterPolicyReport changes
WatchClusterPolicyReports(WatchClusterPolicyReportCallback) error
// FetchClusterPolicyReport from the unterlying API
FetchClusterPolicyReports() ([]ClusterPolicyReport, error)
}

View file

@ -1,6 +1,7 @@
package report
import (
"bytes"
"fmt"
"time"
)
@ -58,6 +59,15 @@ func (p Priority) String() string {
}
}
// MarshalJSON marshals the enum as a quoted json string
func (p Priority) MarshalJSON() ([]byte, error) {
buffer := bytes.NewBufferString(`"`)
buffer.WriteString(p.String())
buffer.WriteString(`"`)
return buffer.Bytes(), nil
}
// PriorityFromStatus creates a Priority based on a Status
func PriorityFromStatus(p Status) Priority {
switch p {
@ -106,8 +116,8 @@ type Result struct {
Rule string
Priority Priority
Status Status
Severity Severity
Category string
Severity Severity `json:",omitempty"`
Category string `json:",omitempty"`
Scored bool
Resources []Resource
}

View file

@ -8,4 +8,6 @@ import (
type Client interface {
// Send the given Result to the configured Target
Send(result report.Result)
// SkipExistingOnStartup skips already existing PolicyReportResults on startup
SkipExistingOnStartup() bool
}

View file

@ -0,0 +1,103 @@
package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
)
type Rotation = string
// Elasticsearch Index Rotation
const (
None Rotation = "none"
Dayli Rotation = "dayli"
Monthly Rotation = "monthly"
Annually Rotation = "annually"
)
type httpClient interface {
Do(req *http.Request) (*http.Response, error)
}
type client struct {
host string
index string
rotation Rotation
minimumPriority string
skipExistingOnStartup bool
client httpClient
}
func (e *client) Send(result report.Result) {
if result.Priority < report.NewPriority(e.minimumPriority) {
return
}
body := new(bytes.Buffer)
if err := json.NewEncoder(body).Encode(result); err != nil {
log.Printf("[ERROR] : %v\n", err.Error())
}
var host string
switch e.rotation {
case None:
host = e.host + "/" + e.index + "/event"
case Annually:
host = e.host + "/" + e.index + "-" + time.Now().Format("2006") + "/event"
case Monthly:
host = e.host + "/" + e.index + "-" + time.Now().Format("2006.01") + "/event"
default:
host = e.host + "/" + e.index + "-" + time.Now().Format("2006.01.02") + "/event"
}
req, err := http.NewRequest("POST", host, body)
if err != nil {
log.Printf("[ERROR] : %v\n", err.Error())
}
req.Header.Add("Content-Type", "application/json; charset=utf-8")
req.Header.Add("User-Agent", "Policy-Reporter")
resp, err := e.client.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err != nil {
log.Printf("[ERROR] PUSH failed: %s\n", err.Error())
} else if resp.StatusCode > 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("[ERROR] PUSH failed [%d]: %s\n", resp.StatusCode, buf.String())
} else {
log.Println("[INFO] PUSH OK")
}
}
func (e *client) SkipExistingOnStartup() bool {
return e.skipExistingOnStartup
}
// NewClient creates a new loki.client to send Results to Loki
func NewClient(host, index, rotation, minimumPriority string, skipExistingOnStartup bool, httpClient httpClient) target.Client {
return &client{
host,
index,
Rotation(rotation),
minimumPriority,
skipExistingOnStartup,
httpClient,
}
}

View file

@ -0,0 +1,94 @@
package elasticsearch_test
import (
"net/http"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target/elasticsearch"
)
var completeResult = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Status: report.Fail,
Severity: report.Heigh,
Category: "resources",
Scored: true,
Resources: []report.Resource{
{
APIVersion: "v1",
Kind: "Deployment",
Name: "nginx",
Namespace: "default",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409",
},
},
}
type testClient struct {
callback func(req *http.Request)
statusCode int
}
func (c testClient) Do(req *http.Request) (*http.Response, error) {
c.callback(req)
return &http.Response{
StatusCode: c.statusCode,
}, nil
}
func Test_ElasticsearchTarget(t *testing.T) {
t.Run("Send with Annually Result", func(t *testing.T) {
callback := func(req *http.Request) {
if contentType := req.Header.Get("Content-Type"); contentType != "application/json; charset=utf-8" {
t.Errorf("Unexpected Content-Type: %s", contentType)
}
if agend := req.Header.Get("User-Agent"); agend != "Policy-Reporter" {
t.Errorf("Unexpected Host: %s", agend)
}
if url := req.URL.String(); url != "http://localhost:9200/policy-reporter-"+time.Now().Format("2006")+"/event" {
t.Errorf("Unexpected Host: %s", url)
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "annually", "", false, testClient{callback, 200})
loki.Send(completeResult)
})
t.Run("Send with Monthly Result", func(t *testing.T) {
callback := func(req *http.Request) {
if url := req.URL.String(); url != "http://localhost:9200/policy-reporter-"+time.Now().Format("2006.01")+"/event" {
t.Errorf("Unexpected Host: %s", url)
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "monthly", "", false, testClient{callback, 200})
loki.Send(completeResult)
})
t.Run("Send with Monthly Result", func(t *testing.T) {
callback := func(req *http.Request) {
if url := req.URL.String(); url != "http://localhost:9200/policy-reporter-"+time.Now().Format("2006.01.02")+"/event" {
t.Errorf("Unexpected Host: %s", url)
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "daily", "", false, testClient{callback, 200})
loki.Send(completeResult)
})
t.Run("Send with None Result", func(t *testing.T) {
callback := func(req *http.Request) {
if url := req.URL.String(); url != "http://localhost:9200/policy-reporter/event" {
t.Errorf("Unexpected Host: %s", url)
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "none", "", false, testClient{callback, 200})
loki.Send(completeResult)
})
}

View file

@ -70,9 +70,10 @@ func newLokiPayload(result report.Result) payload {
}
type client struct {
host string
minimumPriority string
client httpClient
host string
minimumPriority string
skipExistingOnStartup bool
client httpClient
}
func (l *client) Send(result report.Result) {
@ -93,7 +94,7 @@ func (l *client) Send(result report.Result) {
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("User-Agent", "Policy-API")
req.Header.Add("User-Agent", "Policy-Reporter")
resp, err := l.client.Do(req)
defer func() {
@ -115,11 +116,16 @@ func (l *client) Send(result report.Result) {
}
}
func (l *client) SkipExistingOnStartup() bool {
return l.skipExistingOnStartup
}
// NewClient creates a new loki.client to send Results to Loki
func NewClient(host, minimumPriority string, httpClient httpClient) target.Client {
func NewClient(host, minimumPriority string, skipExistingOnStartup bool, httpClient httpClient) target.Client {
return &client{
host + "/api/prom/push",
minimumPriority,
skipExistingOnStartup,
httpClient,
}
}

View file

@ -59,7 +59,7 @@ func Test_LokiTarget(t *testing.T) {
t.Errorf("Unexpected Content-Type: %s", contentType)
}
if agend := req.Header.Get("User-Agent"); agend != "Policy-API" {
if agend := req.Header.Get("User-Agent"); agend != "Policy-Reporter" {
t.Errorf("Unexpected Host: %s", agend)
}
@ -109,7 +109,7 @@ func Test_LokiTarget(t *testing.T) {
}
}
loki := loki.NewClient("http://localhost:3100", "", testClient{callback, 200})
loki := loki.NewClient("http://localhost:3100", "", false, testClient{callback, 200})
loki.Send(completeResult)
})
@ -119,7 +119,7 @@ func Test_LokiTarget(t *testing.T) {
t.Errorf("Unexpected Content-Type: %s", contentType)
}
if agend := req.Header.Get("User-Agent"); agend != "Policy-API" {
if agend := req.Header.Get("User-Agent"); agend != "Policy-Reporter" {
t.Errorf("Unexpected Host: %s", agend)
}
@ -167,7 +167,7 @@ func Test_LokiTarget(t *testing.T) {
}
}
loki := loki.NewClient("http://localhost:3100", "", testClient{callback, 200})
loki := loki.NewClient("http://localhost:3100", "", false, testClient{callback, 200})
loki.Send(minimalResult)
})
}