1
0
Fork 0
mirror of https://github.com/kyverno/policy-reporter.git synced 2024-12-14 11:57:32 +00:00

Development (#11)

* Update WatchStructure to work with one Watch
* Implement new Tests
* Split ReportClient into Multiple Objects
* Helm Chart updates
This commit is contained in:
Frank Jogeleit 2021-03-05 14:26:47 +01:00 committed by GitHub
parent 4cb9fadbb4
commit fce86e7a60
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 2753 additions and 762 deletions

View file

@ -5,6 +5,7 @@ on:
# Publish `master` as Docker `latest` image.
branches:
- main
- development
# Publish `v1.2.3` tags as releases.
tags:

View file

@ -1,5 +1,13 @@
# Changelog
## 0.14.0
* Internal refactoring
* Improved test coverage
* Removed duplicated caching
* Updated Dashboard
* Filter zero values from Policy Report Detail after Policies / Resources are deleted
## 0.13.0
* Split the Monitoring out in a Sub Helm chart

View file

@ -14,11 +14,11 @@ prepare:
.PHONY: test
test:
go test -v ./...
go test -v ./... -timeout=120s
.PHONY: coverage
coverage:
go test -v ./... -covermode=count -coverprofile=coverage.out
go test -v ./... -covermode=count -coverprofile=coverage.out -timeout=120s
.PHONY: build
build: prepare

View file

@ -3,8 +3,8 @@ name: policy-reporter
description: K8s PolicyReporter watches for wgpolicyk8s.io/v1alpha1.PolicyReport resources. It creates Prometheus Metrics and can send rule validation events to different targets like Loki, Elasticsearch, Slack or Discord
type: application
version: 0.13.0
appVersion: 0.10.0
version: 0.14.0
appVersion: 0.11.0
dependencies:
- name: monitoring

View file

@ -3,5 +3,5 @@ name: monitoring
description: Policy Reporter Monitoring with predefined ServiceMonitor and Grafana Dashboards
type: application
version: 0.1.0
version: 0.2.0
appVersion: 0.0.0

View file

@ -98,7 +98,7 @@ data:
"targets": [
{
"expr": "sum(cluster_policy_report_result{policy=~\"$policy\", status=\"pass\"})",
"instant": false,
"instant": true,
"interval": "",
"legendFormat": "",
"refId": "A"
@ -155,7 +155,7 @@ data:
"targets": [
{
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"warn\"})",
"instant": false,
"instant": true,
"interval": "",
"legendFormat": "",
"refId": "A"
@ -212,7 +212,7 @@ data:
"targets": [
{
"expr": "sum(cluster_policy_report_result{policy=~\"$policy\", status=\"fail\"})",
"instant": false,
"instant": true,
"interval": "",
"legendFormat": "",
"refId": "A"
@ -269,7 +269,7 @@ data:
"targets": [
{
"expr": "sum(cluster_policy_report_result{policy=~\"$policy\", status=\"error\"})",
"instant": false,
"instant": true,
"interval": "",
"legendFormat": "",
"refId": "A"

View file

@ -102,7 +102,7 @@ data:
"targets": [
{
"expr": "sum(policy_report_summary{status=\"Fail\"} > 0) by (exported_namespace)",
"instant": false,
"instant": true,
"interval": "",
"legendFormat": "{{`{{ exported_namespace }}`}}",
"refId": "A"

View file

@ -95,8 +95,8 @@ data:
"pluginVersion": "7.1.5",
"targets": [
{
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"pass\"}) by (exported_namespace)",
"instant": false,
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"pass\"} > 0) by (exported_namespace)",
"instant": true,
"interval": "",
"legendFormat": "{{`{{ exported_namespace }}`}}",
"refId": "A"
@ -150,8 +150,8 @@ data:
"pluginVersion": "7.1.5",
"targets": [
{
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"fail\"}) by (exported_namespace)",
"instant": false,
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"fail\"} > 0) by (exported_namespace)",
"instant": true,
"interval": "",
"legendFormat": "{{`{{ exported_namespace }}`}}",
"refId": "A"
@ -205,8 +205,8 @@ data:
"pluginVersion": "7.1.5",
"targets": [
{
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"warn\"}) by (exported_namespace)",
"instant": false,
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"warn\"} > 0) by (exported_namespace)",
"instant": true,
"interval": "",
"legendFormat": "{{`{{ exported_namespace }}`}}",
"refId": "A"
@ -260,8 +260,8 @@ data:
"pluginVersion": "7.1.5",
"targets": [
{
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"error\"}) by (exported_namespace)",
"instant": false,
"expr": "sum(policy_report_result{policy=~\"$policy\", status=\"error\"} > 0) by (exported_namespace)",
"instant": true,
"interval": "",
"legendFormat": "{{`{{ exported_namespace }}`}}",
"refId": "A"

View file

@ -1,7 +1,7 @@
image:
repository: fjogeleit/policy-reporter
pullPolicy: IfNotPresent
tag: 0.10.0
tag: 0.11.0
imagePullSecrets: []

View file

@ -1,15 +1,19 @@
package cmd
import (
"context"
"flag"
"net/http"
"github.com/fjogeleit/policy-reporter/pkg/config"
"github.com/fjogeleit/policy-reporter/pkg/metrics"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func newRunCMD() *cobra.Command {
@ -22,37 +26,40 @@ func newRunCMD() *cobra.Command {
return err
}
resolver := config.NewResolver(c)
client, err := resolver.PolicyReportClient()
var k8sConfig *rest.Config
if c.Kubeconfig != "" {
k8sConfig, err = clientcmd.BuildConfigFromFlags("", c.Kubeconfig)
} else {
k8sConfig, err = rest.InClusterConfig()
}
if err != nil {
return err
}
policyMetrics, err := resolver.PolicyReportMetrics()
ctx := context.Background()
resolver := config.NewResolver(c, k8sConfig)
pClient, err := resolver.PolicyReportClient(ctx)
if err != nil {
return err
}
cpClient, err := resolver.ClusterPolicyReportClient(ctx)
if err != nil {
return err
}
rClient, err := resolver.PolicyResultClient(ctx)
if err != nil {
return err
}
clusterPolicyMetrics, err := resolver.ClusterPolicyReportMetrics()
if err != nil {
return err
}
cpClient.RegisterCallback(metrics.CreateClusterPolicyReportMetricsCallback())
pClient.RegisterCallback(metrics.CreatePolicyReportMetricsCallback())
g := new(errgroup.Group)
targets := resolver.TargetClients()
g.Go(policyMetrics.GenerateMetrics)
g.Go(clusterPolicyMetrics.GenerateMetrics)
g.Go(func() error {
targets := resolver.TargetClients()
if len(targets) == 0 {
return nil
}
return client.WatchPolicyReportResults(func(r report.Result, e bool) {
if len(targets) > 0 {
rClient.RegisterPolicyResultCallback(func(r report.Result, e bool) {
for _, t := range targets {
go func(target target.Client, result report.Result, preExisted bool) {
if preExisted && target.SkipExistingOnStartup() {
@ -62,9 +69,14 @@ func newRunCMD() *cobra.Command {
target.Send(result)
}(t, r, e)
}
}, resolver.SkipExistingOnStartup())
})
})
rClient.RegisterPolicyResultWatcher(resolver.SkipExistingOnStartup())
}
g := new(errgroup.Group)
g.Go(cpClient.StartWatching)
g.Go(pClient.StartWatching)
g.Go(func() error {
http.Handle("/metrics", promhttp.Handler())

View file

@ -1,6 +1,7 @@
package cmd
import (
"context"
"flag"
"sync"
@ -8,6 +9,8 @@ import (
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/spf13/cobra"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func newSendCMD() *cobra.Command {
@ -20,9 +23,19 @@ func newSendCMD() *cobra.Command {
return err
}
resolver := config.NewResolver(c)
var k8sConfig *rest.Config
if c.Kubeconfig != "" {
k8sConfig, err = clientcmd.BuildConfigFromFlags("", c.Kubeconfig)
} else {
k8sConfig, err = rest.InClusterConfig()
}
if err != nil {
return err
}
client, err := resolver.PolicyReportClient()
resolver := config.NewResolver(c, k8sConfig)
client, err := resolver.PolicyResultClient(context.Background())
if err != nil {
return err
}
@ -33,7 +46,7 @@ func newSendCMD() *cobra.Command {
return nil
}
results, err := client.FetchPolicyReportResults()
results, err := client.FetchPolicyResults()
if err != nil {
return err
}

1
go.mod
View file

@ -10,6 +10,7 @@ require (
github.com/mitchellh/mapstructure v1.4.1 // indirect
github.com/pelletier/go-toml v1.8.1 // indirect
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_model v0.2.0
github.com/spf13/afero v1.5.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/cobra v1.1.3

4
go.sum
View file

@ -105,6 +105,7 @@ github.com/emicklei/go-restful v0.0.0-20170410110728-ff4f55a20633/go.mod h1:otzb
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/evanphx/json-patch v4.9.0+incompatible h1:kLcOMZeuLAJvL2BPWLMIj5oaZQobrkAqrL+WFZwQses=
github.com/evanphx/json-patch v4.9.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
@ -198,6 +199,7 @@ github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
@ -346,6 +348,7 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/pkg/sftp v1.10.1/go.mod h1:lYOWFsE0bwd1+KfKJaKeuokY15vzFx25BLbzYYoAxZI=
@ -760,6 +763,7 @@ k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=
k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.5.0 h1:8mOnjf1RmUPW6KRqQCfYSZq/K20Unmp3IhuZUhxl8KI=
k8s.io/klog/v2 v2.5.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd h1:sOHNzJIkytDF6qadMNKhhDRpc6ODik8lVC6nOur7B2c=
k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAGcJo0Tvi+dK12EcqSLqcWsryKMpfM=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=

View file

@ -7,43 +7,126 @@ import (
"time"
"github.com/fjogeleit/policy-reporter/pkg/kubernetes"
"github.com/fjogeleit/policy-reporter/pkg/metrics"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/fjogeleit/policy-reporter/pkg/target/discord"
"github.com/fjogeleit/policy-reporter/pkg/target/elasticsearch"
"github.com/fjogeleit/policy-reporter/pkg/target/loki"
"github.com/fjogeleit/policy-reporter/pkg/target/slack"
"k8s.io/client-go/dynamic"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
)
// Resolver manages dependencies
type Resolver struct {
config *Config
kubeClient report.Client
lokiClient target.Client
elasticsearchClient target.Client
slackClient target.Client
discordClient target.Client
policyReportMetrics metrics.Metrics
clusterPolicyReportMetrics metrics.Metrics
config *Config
k8sConfig *rest.Config
mapper kubernetes.Mapper
resultClient report.ResultClient
policyClient report.PolicyClient
clusterPolicyClient report.ClusterPolicyClient
lokiClient target.Client
elasticsearchClient target.Client
slackClient target.Client
discordClient target.Client
}
// PolicyResultClient resolver method
func (r *Resolver) PolicyResultClient(ctx context.Context) (report.ResultClient, error) {
if r.resultClient != nil {
return r.resultClient, nil
}
pClient, err := r.PolicyReportClient(ctx)
if err != nil {
return nil, err
}
cpClient, err := r.ClusterPolicyReportClient(ctx)
if err != nil {
return nil, err
}
client := kubernetes.NewPolicyResultClient(pClient, cpClient)
r.resultClient = client
return client, nil
}
// PolicyReportClient resolver method
func (r *Resolver) PolicyReportClient() (report.Client, error) {
if r.kubeClient != nil {
return r.kubeClient, nil
func (r *Resolver) PolicyReportClient(ctx context.Context) (report.PolicyClient, error) {
if r.policyClient != nil {
return r.policyClient, nil
}
client, err := kubernetes.NewPolicyReportClient(
context.Background(),
r.config.Kubeconfig,
r.config.Namespace,
mapper, err := r.Mapper(ctx)
if err != nil {
return nil, err
}
policyAPI, err := r.policyReportAPI()
if err != nil {
return nil, err
}
client := kubernetes.NewPolicyReportClient(
policyAPI,
mapper,
time.Now(),
)
r.kubeClient = client
r.policyClient = client
return client, err
return client, nil
}
// ClusterPolicyReportClient resolver method
func (r *Resolver) ClusterPolicyReportClient(ctx context.Context) (report.ClusterPolicyClient, error) {
if r.clusterPolicyClient != nil {
return r.clusterPolicyClient, nil
}
mapper, err := r.Mapper(ctx)
if err != nil {
return nil, err
}
policyAPI, err := r.policyReportAPI()
if err != nil {
return nil, err
}
client := kubernetes.NewClusterPolicyReportClient(
policyAPI,
mapper,
time.Now(),
)
r.clusterPolicyClient = client
return client, nil
}
// Mapper resolver method
func (r *Resolver) Mapper(ctx context.Context) (kubernetes.Mapper, error) {
if r.mapper != nil {
return r.mapper, nil
}
cmAPI, err := r.configMapAPI()
if err != nil {
return nil, err
}
mapper := kubernetes.NewMapper(make(map[string]string), cmAPI)
mapper.FetchPriorities(ctx)
go mapper.SyncPriorities(ctx)
r.mapper = mapper
return mapper, err
}
// LokiClient resolver method
@ -142,38 +225,6 @@ func (r *Resolver) DiscordClient() target.Client {
return r.discordClient
}
// PolicyReportMetrics resolver method
func (r *Resolver) PolicyReportMetrics() (metrics.Metrics, error) {
if r.policyReportMetrics != nil {
return r.policyReportMetrics, nil
}
client, err := r.PolicyReportClient()
if err != nil {
return nil, err
}
r.policyReportMetrics = metrics.NewPolicyReportMetrics(client)
return r.policyReportMetrics, nil
}
// ClusterPolicyReportMetrics resolver method
func (r *Resolver) ClusterPolicyReportMetrics() (metrics.Metrics, error) {
if r.clusterPolicyReportMetrics != nil {
return r.clusterPolicyReportMetrics, nil
}
client, err := r.PolicyReportClient()
if err != nil {
return nil, err
}
r.clusterPolicyReportMetrics = metrics.NewClusterPolicyMetrics(client)
return r.clusterPolicyReportMetrics, nil
}
func (r *Resolver) TargetClients() []target.Client {
clients := make([]target.Client, 0)
@ -206,18 +257,38 @@ func (r *Resolver) SkipExistingOnStartup() bool {
return true
}
// Reset all cached dependencies
func (r *Resolver) Reset() {
r.kubeClient = nil
r.lokiClient = nil
r.elasticsearchClient = nil
r.policyReportMetrics = nil
r.clusterPolicyReportMetrics = nil
func (r *Resolver) ConfigMapClient() (v1.ConfigMapInterface, error) {
var err error
client, err := v1.NewForConfig(r.k8sConfig)
if err != nil {
return nil, err
}
return client.ConfigMaps(r.config.Namespace), nil
}
func (r *Resolver) configMapAPI() (kubernetes.ConfigMapAdapter, error) {
client, err := r.ConfigMapClient()
if err != nil {
return nil, err
}
return kubernetes.NewConfigMapAdapter(client), nil
}
func (r *Resolver) policyReportAPI() (kubernetes.PolicyReportAdapter, error) {
client, err := dynamic.NewForConfig(r.k8sConfig)
if err != nil {
return nil, err
}
return kubernetes.NewPolicyReportAdapter(client), nil
}
// NewResolver constructor function
func NewResolver(config *Config) Resolver {
func NewResolver(config *Config, k8sConfig *rest.Config) Resolver {
return Resolver{
config: config,
config: config,
k8sConfig: k8sConfig,
}
}

View file

@ -1,9 +1,11 @@
package config_test
import (
"context"
"testing"
"github.com/fjogeleit/policy-reporter/pkg/config"
"k8s.io/client-go/rest"
)
var testConfig = &config.Config{
@ -31,8 +33,8 @@ var testConfig = &config.Config{
},
}
func Test_ResolveClient(t *testing.T) {
resolver := config.NewResolver(testConfig)
func Test_ResolveTarget(t *testing.T) {
resolver := config.NewResolver(testConfig, nil)
t.Run("Loki", func(t *testing.T) {
client := resolver.LokiClient()
@ -81,7 +83,7 @@ func Test_ResolveClient(t *testing.T) {
}
func Test_ResolveTargets(t *testing.T) {
resolver := config.NewResolver(testConfig)
resolver := config.NewResolver(testConfig, nil)
clients := resolver.TargetClients()
if count := len(clients); count != 4 {
@ -98,8 +100,6 @@ func Test_ResolveSkipExistingOnStartup(t *testing.T) {
},
Elasticsearch: config.Elasticsearch{
Host: "http://localhost:9200",
Index: "policy-reporter",
Rotation: "dayli",
SkipExisting: true,
MinimumPriority: "debug",
},
@ -108,7 +108,7 @@ func Test_ResolveSkipExistingOnStartup(t *testing.T) {
t.Run("Resolve false", func(t *testing.T) {
testConfig.Elasticsearch.SkipExisting = false
resolver := config.NewResolver(testConfig)
resolver := config.NewResolver(testConfig, nil)
if resolver.SkipExistingOnStartup() == true {
t.Error("Expected SkipExistingOnStartup to be false if one Client has SkipExistingOnStartup false configured")
@ -118,7 +118,7 @@ func Test_ResolveSkipExistingOnStartup(t *testing.T) {
t.Run("Resolve true", func(t *testing.T) {
testConfig.Elasticsearch.SkipExisting = true
resolver := config.NewResolver(testConfig)
resolver := config.NewResolver(testConfig, nil)
if resolver.SkipExistingOnStartup() == false {
t.Error("Expected SkipExistingOnStartup to be true if all Client has SkipExistingOnStartup true configured")
@ -126,7 +126,7 @@ func Test_ResolveSkipExistingOnStartup(t *testing.T) {
})
}
func Test_ResolveClientWithoutHost(t *testing.T) {
func Test_ResolveTargetWithoutHost(t *testing.T) {
config2 := &config.Config{
Loki: config.Loki{
Host: "",
@ -153,35 +153,85 @@ func Test_ResolveClientWithoutHost(t *testing.T) {
}
t.Run("Loki", func(t *testing.T) {
resolver := config.NewResolver(config2)
resolver.Reset()
resolver := config.NewResolver(config2, nil)
if resolver.LokiClient() != nil {
t.Error("Expected Client to be nil if no host is configured")
}
})
t.Run("Elasticsearch", func(t *testing.T) {
resolver := config.NewResolver(config2)
resolver.Reset()
resolver := config.NewResolver(config2, nil)
if resolver.ElasticsearchClient() != nil {
t.Error("Expected Client to be nil if no host is configured")
}
})
t.Run("Slack", func(t *testing.T) {
resolver := config.NewResolver(config2)
resolver.Reset()
resolver := config.NewResolver(config2, nil)
if resolver.SlackClient() != nil {
t.Error("Expected Client to be nil if no host is configured")
}
})
t.Run("Discord", func(t *testing.T) {
resolver := config.NewResolver(config2)
resolver.Reset()
resolver := config.NewResolver(config2, nil)
if resolver.DiscordClient() != nil {
t.Error("Expected Client to be nil if no host is configured")
}
})
}
func Test_ResolveResultClient(t *testing.T) {
resolver := config.NewResolver(&config.Config{}, &rest.Config{})
client1, err := resolver.PolicyResultClient(context.Background())
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
client2, err := resolver.PolicyResultClient(context.Background())
if client1 != client2 {
t.Error("A second call resolver.PolicyResultClient() should return the cached first client")
}
}
func Test_ResolvePolicyClient(t *testing.T) {
resolver := config.NewResolver(&config.Config{}, &rest.Config{})
client1, err := resolver.PolicyReportClient(context.Background())
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
client2, err := resolver.PolicyReportClient(context.Background())
if client1 != client2 {
t.Error("A second call resolver.PolicyReportClient() should return the cached first client")
}
}
func Test_ResolveClusterPolicyClient(t *testing.T) {
resolver := config.NewResolver(&config.Config{}, &rest.Config{})
client1, err := resolver.ClusterPolicyReportClient(context.Background())
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
client2, err := resolver.ClusterPolicyReportClient(context.Background())
if client1 != client2 {
t.Error("A second call resolver.ClusterPolicyReportClient() should return the cached first client")
}
}
func Test_ResolveClientWithInvalidK8sConfig(t *testing.T) {
k8sConfig := &rest.Config{}
k8sConfig.Host = "invalid/url"
resolver := config.NewResolver(&config.Config{}, k8sConfig)
_, err := resolver.PolicyReportClient(context.Background())
if err == nil {
t.Error("Error: 'host must be a URL or a host:port pair' was expected")
}
}

View file

@ -0,0 +1,176 @@
package kubernetes
import (
"errors"
"log"
"sync"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)
type clusterPolicyReportClient struct {
policyAPI PolicyReportAdapter
cache map[string]report.ClusterPolicyReport
callbacks []report.ClusterPolicyReportCallback
resultCallbacks []report.PolicyResultCallback
mapper Mapper
startUp time.Time
skipExisting bool
started bool
}
func (c *clusterPolicyReportClient) RegisterCallback(cb report.ClusterPolicyReportCallback) {
c.callbacks = append(c.callbacks, cb)
}
func (c *clusterPolicyReportClient) RegisterPolicyResultCallback(cb report.PolicyResultCallback) {
c.resultCallbacks = append(c.resultCallbacks, cb)
}
func (c *clusterPolicyReportClient) FetchClusterPolicyReports() ([]report.ClusterPolicyReport, error) {
var reports []report.ClusterPolicyReport
result, err := c.policyAPI.ListClusterPolicyReports()
if err != nil {
log.Printf("K8s List Error: %s\n", err.Error())
return reports, err
}
for _, item := range result.Items {
reports = append(reports, c.mapper.MapClusterPolicyReport(item.Object))
}
return reports, nil
}
func (c *clusterPolicyReportClient) FetchPolicyResults() ([]report.Result, error) {
var results []report.Result
reports, err := c.FetchClusterPolicyReports()
if err != nil {
return results, err
}
for _, clusterReport := range reports {
for _, result := range clusterReport.Results {
results = append(results, result)
}
}
return results, nil
}
func (c *clusterPolicyReportClient) StartWatching() error {
if c.started {
return errors.New("ClusterPolicyClient.StartWatching was already started")
}
c.started = true
for {
result, err := c.policyAPI.WatchClusterPolicyReports()
if err != nil {
c.started = false
return err
}
for result := range result.ResultChan() {
if item, ok := result.Object.(*unstructured.Unstructured); ok {
c.executeClusterPolicyReportHandler(result.Type, c.mapper.MapClusterPolicyReport(item.Object))
}
}
// skip existing results when the watcher restarts
c.skipExisting = true
}
}
func (c *clusterPolicyReportClient) executeClusterPolicyReportHandler(e watch.EventType, cpr report.ClusterPolicyReport) {
opr := report.ClusterPolicyReport{}
if e != watch.Added {
opr = c.cache[cpr.GetIdentifier()]
}
if e != watch.Deleted {
wg := sync.WaitGroup{}
wg.Add(len(c.callbacks))
for _, cb := range c.callbacks {
go func(
callback report.ClusterPolicyReportCallback,
event watch.EventType,
creport report.ClusterPolicyReport,
oreport report.ClusterPolicyReport,
) {
callback(event, creport, oreport)
wg.Done()
}(cb, e, cpr, opr)
}
wg.Wait()
c.cache[cpr.GetIdentifier()] = cpr
return
}
delete(c.cache, cpr.GetIdentifier())
}
func (c *clusterPolicyReportClient) RegisterPolicyResultWatcher(skipExisting bool) {
c.skipExisting = skipExisting
c.RegisterCallback(func(s watch.EventType, cpr report.ClusterPolicyReport, opr report.ClusterPolicyReport) {
switch s {
case watch.Added:
preExisted := cpr.CreationTimestamp.Before(c.startUp)
if c.skipExisting && preExisted {
break
}
wg := sync.WaitGroup{}
wg.Add(len(cpr.Results) * len(c.resultCallbacks))
for _, r := range cpr.Results {
for _, cb := range c.resultCallbacks {
go func(callback report.PolicyResultCallback, result report.Result) {
callback(result, preExisted)
wg.Done()
}(cb, r)
}
}
wg.Wait()
case watch.Modified:
diff := cpr.GetNewResults(c.cache[cpr.GetIdentifier()])
wg := sync.WaitGroup{}
wg.Add(len(diff) * len(c.resultCallbacks))
for _, r := range diff {
for _, cb := range c.resultCallbacks {
go func(callback report.PolicyResultCallback, result report.Result) {
callback(result, false)
wg.Done()
}(cb, r)
}
}
wg.Wait()
}
})
}
// NewPolicyReportClient creates a new PolicyReportClient based on the kubernetes go-client
func NewClusterPolicyReportClient(client PolicyReportAdapter, mapper Mapper, startUp time.Time) report.ClusterPolicyClient {
return &clusterPolicyReportClient{
policyAPI: client,
cache: make(map[string]report.ClusterPolicyReport),
mapper: mapper,
startUp: startUp,
}
}

View file

@ -0,0 +1,376 @@
package kubernetes_test
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/kubernetes"
"github.com/fjogeleit/policy-reporter/pkg/report"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
testcore "k8s.io/client-go/testing"
)
func Test_FetchClusterPolicyReports(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
fakeAdapter.clusterPolicies = append(fakeAdapter.clusterPolicies, unstructured.Unstructured{Object: clusterPolicyMap})
policies, err := client.FetchClusterPolicyReports()
if err != nil {
t.Fatalf("Unexpected Error: %s", err)
}
if len(policies) != 1 {
t.Fatal("Expected one Policy")
}
expected := kubernetes.NewMapper(configMap.Data, nil).MapClusterPolicyReport(clusterPolicyMap)
policy := policies[0]
if policy.Name != expected.Name {
t.Errorf("Expected Policy Name %s", expected.Name)
}
}
func Test_FetchClusterPolicyReportsError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.clusterPolicyError = errors.New("")
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
_, err := client.FetchClusterPolicyReports()
if err == nil {
t.Error("Configured Error should be returned")
}
}
func Test_FetchClusterPolicyResults(t *testing.T) {
fakeClient, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
watcher := watch.NewFake()
fakeClient.PrependWatchReactor("configmaps", testcore.DefaultWatchReactor(watcher, nil))
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
fakeAdapter.clusterPolicies = append(fakeAdapter.clusterPolicies, unstructured.Unstructured{Object: clusterPolicyMap})
results, err := client.FetchPolicyResults()
if err != nil {
t.Fatalf("Unexpected Error: %s", err)
}
if len(results) != 1 {
t.Fatalf("Expected 1 Results, got %d", len(results))
}
}
func Test_FetchClusterPolicyResultsError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.clusterPolicyError = errors.New("")
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
_, err := client.FetchPolicyResults()
if err == nil {
t.Error("ClusterPolicyFetch Error should be returned by FetchPolicyResults")
}
}
func Test_ClusterPolicyWatcher(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(1)
results := make([]report.Result, 0, 1)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: clusterPolicyMap})
wg.Wait()
if len(results) != 1 {
t.Error("Should receive 1 Result from the ClusterPolicy")
}
}
func Test_ClusterPolicyWatcherTwice(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
go client.StartWatching()
time.Sleep(10 * time.Millisecond)
err := client.StartWatching()
if err == nil {
t.Error("Second StartWatching call should return immediately with error")
}
}
var notSkippedClusterPolicyMap = map[string]interface{}{
"metadata": map[string]interface{}{
"name": "clusterpolicy-report",
"creationTimestamp": time.Now().Add(10 * time.Minute).Format("2006-01-02T15:04:05Z"),
},
"summary": map[string]interface{}{
"pass": int64(1),
"skip": int64(2),
"warn": int64(3),
"fail": int64(4),
"error": int64(5),
},
"results": []interface{}{
map[string]interface{}{
"message": "message",
"status": "fail",
"scored": true,
"policy": "not-skiped-cluster-policy-result",
"rule": "app-label-required",
"category": "test",
"severity": "low",
"resources": []interface{}{
map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"name": "policy-reporter",
"uid": "dfd57c50-f30c-4729-b63f-b1954d8988d1",
},
},
},
},
}
func Test_SkipExisting(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(true)
wg := sync.WaitGroup{}
wg.Add(1)
results := make([]report.Result, 0, 1)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: notSkippedClusterPolicyMap})
wg.Wait()
if len(results) != 1 {
t.Error("Should receive one not skipped Result form notSkippedClusterPolicyMap")
}
if results[0].Policy != "not-skiped-cluster-policy-result" {
t.Error("Should be 'not-skiped-cluster-policy-result'")
}
}
func Test_WatcherError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.clusterPolicyError = errors.New("")
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
err := client.StartWatching()
if err == nil {
t.Error("Shoud stop execution when error is returned")
}
}
func Test_WatchDeleteEvent(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(1)
results := make([]report.Result, 0, 1)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: clusterPolicyMap})
fakeAdapter.clusterPolicyWatcher.Delete(&unstructured.Unstructured{Object: clusterPolicyMap})
wg.Wait()
if len(results) != 1 {
t.Error("Should receive initial 1 and no result from deletion")
}
}
func Test_WatchModifiedEvent(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewClusterPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(2)
results := make([]report.Result, 0, 2)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: clusterPolicyMap})
clusterPolicyMap2 := map[string]interface{}{
"metadata": map[string]interface{}{
"name": "clusterpolicy-report",
"creationTimestamp": "2021-02-23T15:00:00Z",
},
"summary": map[string]interface{}{
"pass": int64(1),
"skip": int64(2),
"warn": int64(3),
"fail": int64(4),
"error": int64(5),
},
"results": []interface{}{
map[string]interface{}{
"message": "message",
"status": "fail",
"scored": true,
"policy": "required-label",
"rule": "app-label-required",
"category": "test",
"severity": "low",
"resources": []interface{}{
map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"name": "policy-reporter",
"uid": "dfd57c50-f30c-4729-b63f-b1954d8988d1",
},
},
},
map[string]interface{}{
"message": "message",
"status": "fail",
"scored": true,
"policy": "required-label",
"rule": "app-label-required",
"category": "test",
"severity": "low",
"resources": []interface{}{
map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"name": "policy-reporter",
"uid": "dfd57c50-f30c-4729-b63f-b1754d7988d1",
},
},
},
},
}
fakeAdapter.clusterPolicyWatcher.Modify(&unstructured.Unstructured{Object: clusterPolicyMap2})
wg.Wait()
if len(results) != 2 {
t.Error("Should receive initial 1 and 1 modification")
}
}

View file

@ -1,70 +0,0 @@
package kubernetes
import (
"context"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
// CoreClient provides simplified APIs for ConfigMap Resources
type CoreClient interface {
// GetConfig return a single ConfigMap by name if exist
GetConfig(ctx context.Context, name string) (*apiv1.ConfigMap, error)
// WatchConfigs calls its ConfigMapCallback whenever a ConfigMap was added, modified or deleted
WatchConfigs(ctx context.Context, cb ConfigMapCallback) error
}
// ConfigMapCallback is used by WatchConfigs
type ConfigMapCallback = func(watch.EventType, *apiv1.ConfigMap)
type coreClient struct {
cmClient v1.ConfigMapInterface
}
func (c coreClient) GetConfig(ctx context.Context, name string) (*apiv1.ConfigMap, error) {
return c.cmClient.Get(ctx, name, metav1.GetOptions{})
}
func (c coreClient) WatchConfigs(ctx context.Context, cb ConfigMapCallback) error {
for {
watch, err := c.cmClient.Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for event := range watch.ResultChan() {
if cm, ok := event.Object.(*apiv1.ConfigMap); ok {
cb(event.Type, cm)
}
}
}
}
// NewCoreClient creates a new CoreClient with the provided kubeconfig or InCluster configuration if kubeconfig is empty
func NewCoreClient(kubeconfig, namespace string) (CoreClient, error) {
var config *rest.Config
var err error
if kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}
client, err := v1.NewForConfig(config)
if err != nil {
return nil, err
}
return &coreClient{
cmClient: client.ConfigMaps(namespace),
}, nil
}

View file

@ -0,0 +1,53 @@
package kubernetes
import (
"context"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
const (
prioriyConfig = "policy-reporter-priorities"
)
// ConfigMapAdapter provides simplified APIs for ConfigMap Resources
type ConfigMapAdapter interface {
// GetConfig return a single ConfigMap by name if exist
GetConfig(ctx context.Context, name string) (*apiv1.ConfigMap, error)
// WatchConfigs calls its ConfigMapCallback whenever a ConfigMap was added, modified or deleted
WatchConfigs(ctx context.Context, cb ConfigMapCallback) error
}
// ConfigMapCallback is used by WatchConfigs
type ConfigMapCallback = func(watch.EventType, *apiv1.ConfigMap)
type cmAdapter struct {
api v1.ConfigMapInterface
}
func (c cmAdapter) GetConfig(ctx context.Context, name string) (*apiv1.ConfigMap, error) {
return c.api.Get(ctx, name, metav1.GetOptions{})
}
func (c cmAdapter) WatchConfigs(ctx context.Context, cb ConfigMapCallback) error {
for {
watch, err := c.api.Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
for event := range watch.ResultChan() {
if cm, ok := event.Object.(*apiv1.ConfigMap); ok {
cb(event.Type, cm)
}
}
}
}
// NewConfigMapAdapter creates a new ConfigMapClient
func NewConfigMapAdapter(api v1.ConfigMapInterface) ConfigMapAdapter {
return &cmAdapter{api}
}

View file

@ -0,0 +1,92 @@
package kubernetes_test
import (
"context"
"errors"
"sync"
"testing"
"github.com/fjogeleit/policy-reporter/pkg/kubernetes"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/fake"
clientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
testcore "k8s.io/client-go/testing"
)
var configMap = &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "policy-reporter-priorities",
},
Data: map[string]string{
"default": "warning",
},
}
func Test_GetConfigMap(t *testing.T) {
_, cmAPI := newFakeAPI()
cmAPI.Create(context.Background(), configMap, metav1.CreateOptions{})
cmClient := kubernetes.NewConfigMapAdapter(cmAPI)
cm, err := cmClient.GetConfig(context.Background(), "policy-reporter-priorities")
if err != nil {
t.Fatalf("Unexpected Error: %s", err)
}
if cm.Name != "policy-reporter-priorities" {
t.Error("Unexpted ConfigMapReturned")
}
if priority, ok := cm.Data["default"]; !ok || priority != "warning" {
t.Error("Unexpted default priority")
}
}
func Test_WatchConfigMap(t *testing.T) {
client, cmAPI := newFakeAPI()
watcher := watch.NewFake()
client.PrependWatchReactor("configmaps", testcore.DefaultWatchReactor(watcher, nil))
cmClient := kubernetes.NewConfigMapAdapter(cmAPI)
wg := sync.WaitGroup{}
wg.Add(1)
go cmClient.WatchConfigs(context.Background(), func(et watch.EventType, cm *v1.ConfigMap) {
defer wg.Done()
if cm.Name != "policy-reporter-priorities" {
t.Error("Unexpted ConfigMapReturned")
}
if priority, ok := cm.Data["default"]; !ok || priority != "warning" {
t.Error("Unexpted default priority")
}
})
watcher.Add(configMap)
wg.Wait()
}
func Test_WatchConfigMapError(t *testing.T) {
client, cmAPI := newFakeAPI()
client.PrependWatchReactor("configmaps", testcore.DefaultWatchReactor(watch.NewFake(), errors.New("")))
cmClient := kubernetes.NewConfigMapAdapter(cmAPI)
err := cmClient.WatchConfigs(context.Background(), func(et watch.EventType, cm *v1.ConfigMap) {})
if err == nil {
t.Error("Watch Error should stop execution")
}
}
func newFakeAPI() (*fake.Clientset, clientv1.ConfigMapInterface) {
client := fake.NewSimpleClientset()
return client, client.CoreV1().ConfigMaps("policy-reporter")
}

View file

@ -1,10 +1,14 @@
package kubernetes
import (
"context"
"errors"
"log"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
)
// Mapper converts maps into report structs
@ -15,10 +19,15 @@ type Mapper interface {
MapClusterPolicyReport(reportMap map[string]interface{}) report.ClusterPolicyReport
// SetPriorityMap updates the policy/status to priority mapping
SetPriorityMap(map[string]string)
// SyncPriorities when ConfigMap has changed
SyncPriorities(ctx context.Context) error
// FetchPriorities from ConfigMap
FetchPriorities(ctx context.Context) error
}
type mapper struct {
priorityMap map[string]string
cmAdapter ConfigMapAdapter
}
func (m *mapper) MapPolicyReport(reportMap map[string]interface{}) report.PolicyReport {
@ -170,9 +179,48 @@ func (m *mapper) resolvePriority(policy string) report.Priority {
return report.Priority(report.ErrorPriority)
}
func (m *mapper) FetchPriorities(ctx context.Context) error {
cm, err := m.cmAdapter.GetConfig(ctx, prioriyConfig)
if err != nil {
return err
}
if cm != nil {
m.SetPriorityMap(cm.Data)
log.Println("[INFO] Priorities loaded")
}
return nil
}
func (m *mapper) SyncPriorities(ctx context.Context) error {
err := m.cmAdapter.WatchConfigs(ctx, func(e watch.EventType, cm *v1.ConfigMap) {
if cm.Name != prioriyConfig {
return
}
switch e {
case watch.Added:
m.SetPriorityMap(cm.Data)
case watch.Modified:
m.SetPriorityMap(cm.Data)
case watch.Deleted:
m.SetPriorityMap(map[string]string{})
}
log.Println("[INFO] Priorities synchronized")
})
if err != nil {
log.Printf("[INFO] Unable to sync Priorities: %s", err.Error())
}
return err
}
// NewMapper creates an new Mapper instance
func NewMapper(priorities map[string]string) Mapper {
m := &mapper{}
func NewMapper(priorities map[string]string, cmAdapter ConfigMapAdapter) Mapper {
m := &mapper{cmAdapter: cmAdapter}
m.SetPriorityMap(priorities)
return m

View file

@ -1,10 +1,16 @@
package kubernetes_test
import (
"context"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/kubernetes"
"github.com/fjogeleit/policy-reporter/pkg/report"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
testcore "k8s.io/client-go/testing"
)
var policyMap = map[string]interface{}{
@ -101,7 +107,7 @@ var priorityMap = map[string]string{
"priority-test": "warning",
}
var mapper = kubernetes.NewMapper(priorityMap)
var mapper = kubernetes.NewMapper(priorityMap, nil)
func Test_MapPolicyReport(t *testing.T) {
preport := mapper.MapPolicyReport(policyMap)
@ -280,7 +286,7 @@ func Test_MapMinClusterPolicyReport(t *testing.T) {
func Test_PriorityMap(t *testing.T) {
t.Run("Test exact match, without default", func(t *testing.T) {
mapper := kubernetes.NewMapper(map[string]string{"required-label": "debug"})
mapper := kubernetes.NewMapper(map[string]string{"required-label": "debug"}, nil)
preport := mapper.MapPolicyReport(policyMap)
@ -292,7 +298,7 @@ func Test_PriorityMap(t *testing.T) {
})
t.Run("Test exact match handled over default", func(t *testing.T) {
mapper := kubernetes.NewMapper(map[string]string{"required-label": "debug", "default": "warning"})
mapper := kubernetes.NewMapper(map[string]string{"required-label": "debug", "default": "warning"}, nil)
preport := mapper.MapPolicyReport(policyMap)
@ -304,8 +310,7 @@ func Test_PriorityMap(t *testing.T) {
})
t.Run("Test default expressions", func(t *testing.T) {
mapper := kubernetes.NewMapper(make(map[string]string))
mapper.SetPriorityMap(map[string]string{"default": "warning"})
mapper := kubernetes.NewMapper(map[string]string{"default": "warning"}, nil)
preport := mapper.MapPolicyReport(policyMap)
@ -316,3 +321,93 @@ func Test_PriorityMap(t *testing.T) {
}
})
}
func Test_PriorityFetch(t *testing.T) {
_, cmAPI := newFakeAPI()
cmAPI.Create(context.Background(), configMap, metav1.CreateOptions{})
mapper := kubernetes.NewMapper(make(map[string]string), kubernetes.NewConfigMapAdapter(cmAPI))
preport1 := mapper.MapPolicyReport(policyMap)
result1 := preport1.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result1.Priority != report.ErrorPriority {
t.Errorf("Default Priority should be Error")
}
mapper.FetchPriorities(context.Background())
preport2 := mapper.MapPolicyReport(policyMap)
result2 := preport2.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result2.Priority != report.WarningPriority {
t.Errorf("Default Priority should be Warning after ConigMap fetch")
}
}
func Test_PriorityFetchError(t *testing.T) {
_, cmAPI := newFakeAPI()
mapper := kubernetes.NewMapper(make(map[string]string), kubernetes.NewConfigMapAdapter(cmAPI))
mapper.FetchPriorities(context.Background())
preport := mapper.MapPolicyReport(policyMap)
result := preport.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result.Priority != report.ErrorPriority {
t.Errorf("Fetch Error should not effect the functionality and continue using Error as default")
}
}
func Test_PrioritySync(t *testing.T) {
client, cmAPI := newFakeAPI()
watcher := watch.NewFake()
client.PrependWatchReactor("configmaps", testcore.DefaultWatchReactor(watcher, nil))
mapper := kubernetes.NewMapper(make(map[string]string), kubernetes.NewConfigMapAdapter(cmAPI))
preport1 := mapper.MapPolicyReport(policyMap)
result1 := preport1.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result1.Priority != report.ErrorPriority {
t.Errorf("Default Priority should be Error")
}
go mapper.SyncPriorities(context.Background())
watcher.Add(configMap)
preport2 := mapper.MapPolicyReport(policyMap)
result2 := preport2.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result2.Priority != report.WarningPriority {
t.Errorf("Default Priority should be Warning after ConigMap add sync")
}
configMap2 := &v1.ConfigMap{
TypeMeta: metav1.TypeMeta{
Kind: "ConfigMap",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: "policy-reporter-priorities",
},
Data: map[string]string{
"default": "debug",
},
}
watcher.Modify(configMap2)
time.Sleep(100 * time.Millisecond)
preport3 := mapper.MapPolicyReport(policyMap)
result3 := preport3.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result3.Priority != report.DebugPriority {
t.Errorf("Default Priority should be Debug after ConigMap modify sync")
}
watcher.Delete(configMap2)
time.Sleep(100 * time.Millisecond)
preport4 := mapper.MapPolicyReport(policyMap)
result4 := preport4.Results["required-label__app-label-required__fail__dfd57c50-f30c-4729-b63f-b1954d8988d1"]
if result4.Priority != report.ErrorPriority {
t.Errorf("Default Priority should be fallback to Error after ConigMap delete sync")
}
}

View file

@ -0,0 +1,160 @@
package kubernetes
import (
"errors"
"log"
"sync"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
)
type policyReportClient struct {
policyAPI PolicyReportAdapter
cache map[string]report.PolicyReport
callbacks []report.PolicyReportCallback
resultCallbacks []report.PolicyResultCallback
mapper Mapper
startUp time.Time
skipExisting bool
started bool
}
func (c *policyReportClient) RegisterCallback(cb report.PolicyReportCallback) {
c.callbacks = append(c.callbacks, cb)
}
func (c *policyReportClient) RegisterPolicyResultCallback(cb report.PolicyResultCallback) {
c.resultCallbacks = append(c.resultCallbacks, cb)
}
func (c *policyReportClient) FetchPolicyReports() ([]report.PolicyReport, error) {
var reports []report.PolicyReport
result, err := c.policyAPI.ListPolicyReports()
if err != nil {
log.Printf("K8s List Error: %s\n", err.Error())
return reports, err
}
for _, item := range result.Items {
reports = append(reports, c.mapper.MapPolicyReport(item.Object))
}
return reports, nil
}
func (c *policyReportClient) FetchPolicyResults() ([]report.Result, error) {
var results []report.Result
reports, err := c.FetchPolicyReports()
if err != nil {
return results, err
}
for _, clusterReport := range reports {
for _, result := range clusterReport.Results {
results = append(results, result)
}
}
return results, nil
}
func (c *policyReportClient) StartWatching() error {
if c.started {
return errors.New("PolicyClient.StartWatching was already started")
}
c.started = true
for {
result, err := c.policyAPI.WatchPolicyReports()
if err != nil {
c.started = false
return err
}
for result := range result.ResultChan() {
if item, ok := result.Object.(*unstructured.Unstructured); ok {
c.executePolicyReportHandler(result.Type, c.mapper.MapPolicyReport(item.Object))
}
}
// skip existing results when the watcher restarts
c.skipExisting = true
}
}
func (c *policyReportClient) executePolicyReportHandler(e watch.EventType, pr report.PolicyReport) {
opr := report.PolicyReport{}
if e != watch.Added {
opr = c.cache[pr.GetIdentifier()]
}
if e != watch.Deleted {
wg := sync.WaitGroup{}
wg.Add(len(c.callbacks))
for _, cb := range c.callbacks {
go func(
callback report.PolicyReportCallback,
event watch.EventType,
creport report.PolicyReport,
oreport report.PolicyReport,
) {
callback(event, creport, oreport)
wg.Done()
}(cb, e, pr, opr)
}
wg.Wait()
c.cache[pr.GetIdentifier()] = pr
return
}
delete(c.cache, pr.GetIdentifier())
}
func (c *policyReportClient) RegisterPolicyResultWatcher(skipExisting bool) {
c.skipExisting = skipExisting
c.RegisterCallback(
func(e watch.EventType, pr report.PolicyReport, or report.PolicyReport) {
switch e {
case watch.Added:
preExisted := pr.CreationTimestamp.Before(c.startUp)
if c.skipExisting && preExisted {
break
}
for _, result := range pr.Results {
for _, cb := range c.resultCallbacks {
cb(result, preExisted)
}
}
case watch.Modified:
diff := pr.GetNewResults(or)
for _, result := range diff {
for _, cb := range c.resultCallbacks {
cb(result, false)
}
}
}
})
}
// NewPolicyReportClient creates a new PolicyReportClient based on the kubernetes go-client
func NewPolicyReportClient(client PolicyReportAdapter, mapper Mapper, startUp time.Time) report.PolicyClient {
return &policyReportClient{
policyAPI: client,
cache: make(map[string]report.PolicyReport),
mapper: mapper,
startUp: startUp,
}
}

View file

@ -0,0 +1,374 @@
package kubernetes_test
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/kubernetes"
"github.com/fjogeleit/policy-reporter/pkg/report"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
func Test_FetchPolicyReports(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
fakeAdapter.policies = append(fakeAdapter.policies, unstructured.Unstructured{Object: policyMap})
policies, err := client.FetchPolicyReports()
if err != nil {
t.Fatalf("Unexpected Error: %s", err)
}
if len(policies) != 1 {
t.Fatal("Expected one Policy")
}
expected := kubernetes.NewMapper(configMap.Data, nil).MapPolicyReport(policyMap)
policy := policies[0]
if policy.Name != expected.Name {
t.Errorf("Expected Policy Name %s", expected.Name)
}
}
func Test_FetchPolicyReportsError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.policyError = errors.New("")
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
_, err := client.FetchPolicyReports()
if err == nil {
t.Error("Configured Error should be returned")
}
}
func Test_FetchPolicyResults(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
fakeAdapter.policies = append(fakeAdapter.policies, unstructured.Unstructured{Object: policyMap})
results, err := client.FetchPolicyResults()
if err != nil {
t.Fatalf("Unexpected Error: %s", err)
}
if len(results) != 2 {
t.Fatalf("Expected 2 Results, got %d", len(results))
}
}
func Test_FetchPolicyResultsError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.policyError = errors.New("")
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
_, err := client.FetchPolicyResults()
if err == nil {
t.Error("PolicyFetch Error should be returned by FetchPolicyResults")
}
}
func Test_PolicyWatcher(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(2)
results := make([]report.Result, 0, 3)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.policyWatcher.Add(&unstructured.Unstructured{Object: policyMap})
wg.Wait()
if len(results) != 2 {
t.Error("Should receive 2 Results from the Policy")
}
}
func Test_PolicyWatcherTwice(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
go client.StartWatching()
time.Sleep(10 * time.Millisecond)
err := client.StartWatching()
if err == nil {
t.Error("Second StartWatching call should return immediately with error")
}
}
var notSkippedPolicyMap = map[string]interface{}{
"metadata": map[string]interface{}{
"name": "policy-report",
"namespace": "test",
"creationTimestamp": time.Now().Add(10 * time.Minute).Format("2006-01-02T15:04:05Z"),
},
"summary": map[string]interface{}{
"pass": int64(1),
"skip": int64(2),
"warn": int64(3),
"fail": int64(4),
"error": int64(5),
},
"results": []interface{}{
map[string]interface{}{
"message": "message",
"status": "fail",
"scored": true,
"policy": "not-skiped-policy-result",
"rule": "app-label-required",
"category": "test",
"severity": "low",
"resources": []interface{}{
map[string]interface{}{
"apiVersion": "v1",
"kind": "Deployment",
"name": "nginx",
"namespace": "test",
"uid": "dfd57c50-f30c-4729-b63f-b1954d8988d1",
},
},
},
},
}
func Test_PolicySkipExisting(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(true)
wg := sync.WaitGroup{}
wg.Add(1)
results := make([]report.Result, 0, 1)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.policyWatcher.Add(&unstructured.Unstructured{Object: policyMap})
fakeAdapter.policyWatcher.Add(&unstructured.Unstructured{Object: notSkippedPolicyMap})
wg.Wait()
if len(results) != 1 {
t.Error("Should receive one not skipped Result form notSkippedPolicyMap")
}
if results[0].Policy != "not-skiped-policy-result" {
t.Error("Should be 'not-skiped-policy-result'")
}
}
func Test_PolicyWatcherError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.policyError = errors.New("")
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
err := client.StartWatching()
if err == nil {
t.Error("Shoud stop execution when error is returned")
}
}
func Test_PolicyWatchDeleteEvent(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(2)
results := make([]report.Result, 0, 2)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.policyWatcher.Add(&unstructured.Unstructured{Object: policyMap})
fakeAdapter.policyWatcher.Delete(&unstructured.Unstructured{Object: policyMap})
wg.Wait()
if len(results) != 2 {
t.Error("Should receive initial 2 and no result from deletion")
}
}
func Test_PolicyWatchModifiedEvent(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
client := kubernetes.NewPolicyReportClient(
fakeAdapter,
NewMapper(k8sCMClient),
time.Now(),
)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(3)
results := make([]report.Result, 0, 3)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go client.StartWatching()
fakeAdapter.policyWatcher.Add(&unstructured.Unstructured{Object: policyMap})
var policyMap2 = map[string]interface{}{
"metadata": map[string]interface{}{
"name": "policy-report",
"namespace": "test",
"creationTimestamp": "2021-02-23T15:00:00Z",
},
"summary": map[string]interface{}{
"pass": int64(1),
"skip": int64(2),
"warn": int64(3),
"fail": int64(4),
"error": int64(5),
},
"results": []interface{}{
map[string]interface{}{
"message": "message",
"status": "fail",
"scored": true,
"policy": "required-label",
"rule": "app-label-required",
"category": "test",
"severity": "low",
"resources": []interface{}{
map[string]interface{}{
"apiVersion": "v1",
"kind": "Deployment",
"name": "nginx",
"namespace": "test",
"uid": "dfd57c50-f30c-4729-b63f-b1954d8988d1",
},
},
},
map[string]interface{}{
"message": "message 2",
"status": "fail",
"scored": true,
"policy": "priority-test",
"resources": []interface{}{},
},
map[string]interface{}{
"message": "message 3",
"status": "pass",
"scored": true,
"policy": "priority-test",
"resources": []interface{}{},
},
},
}
fakeAdapter.policyWatcher.Modify(&unstructured.Unstructured{Object: policyMap2})
wg.Wait()
if len(results) != 3 {
t.Error("Should receive initial 2 and 1 modification")
}
}

View file

@ -0,0 +1,47 @@
package kubernetes
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
)
var (
PolicyReports = schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha1", Resource: "policyreports"}
ClusterPolicyReports = schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha1", Resource: "clusterpolicyreports"}
)
type PolicyReportAdapter interface {
ListClusterPolicyReports() (*unstructured.UnstructuredList, error)
ListPolicyReports() (*unstructured.UnstructuredList, error)
WatchClusterPolicyReports() (watch.Interface, error)
WatchPolicyReports() (watch.Interface, error)
}
type k8sPolicyReportAdapter struct {
client dynamic.Interface
}
func (k *k8sPolicyReportAdapter) ListClusterPolicyReports() (*unstructured.UnstructuredList, error) {
return k.client.Resource(ClusterPolicyReports).List(context.Background(), metav1.ListOptions{})
}
func (k *k8sPolicyReportAdapter) ListPolicyReports() (*unstructured.UnstructuredList, error) {
return k.client.Resource(PolicyReports).List(context.Background(), metav1.ListOptions{})
}
func (k *k8sPolicyReportAdapter) WatchClusterPolicyReports() (watch.Interface, error) {
return k.client.Resource(ClusterPolicyReports).Watch(context.Background(), metav1.ListOptions{})
}
func (k *k8sPolicyReportAdapter) WatchPolicyReports() (watch.Interface, error) {
return k.client.Resource(PolicyReports).Watch(context.Background(), metav1.ListOptions{})
}
func NewPolicyReportAdapter(dynamic dynamic.Interface) PolicyReportAdapter {
return &k8sPolicyReportAdapter{dynamic}
}

View file

@ -1,109 +1,45 @@
package kubernetes
import (
"context"
"log"
"sync"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var (
policyReports = schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha1", Resource: "policyreports"}
clusterPolicyReports = schema.GroupVersionResource{Group: "wgpolicyk8s.io", Version: "v1alpha1", Resource: "clusterpolicyreports"}
)
const (
prioriyConfig = "policy-reporter-priorities"
)
type policyReportClient struct {
client dynamic.Interface
coreClient CoreClient
policyCache map[string]report.PolicyReport
clusterPolicyCache map[string]report.ClusterPolicyReport
mapper Mapper
startUp time.Time
type resultClient struct {
policyClient report.PolicyClient
clusterPolicyClient report.ClusterPolicyClient
}
func (c *policyReportClient) FetchPolicyReports() ([]report.PolicyReport, error) {
var reports []report.PolicyReport
result, err := c.client.Resource(policyReports).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Printf("K8s List Error: %s\n", err.Error())
return reports, err
}
for _, item := range result.Items {
reports = append(reports, c.mapper.MapPolicyReport(item.Object))
}
return reports, nil
}
func (c *policyReportClient) FetchClusterPolicyReports() ([]report.ClusterPolicyReport, error) {
var reports []report.ClusterPolicyReport
result, err := c.client.Resource(clusterPolicyReports).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Printf("K8s List Error: %s\n", err.Error())
return reports, err
}
for _, item := range result.Items {
reports = append(reports, c.mapper.MapClusterPolicyReport(item.Object))
}
return reports, nil
}
func (c *policyReportClient) FetchPolicyReportResults() ([]report.Result, error) {
func (c *resultClient) FetchPolicyResults() ([]report.Result, error) {
g := new(errgroup.Group)
mx := new(sync.Mutex)
var results []report.Result
g.Go(func() error {
reports, err := c.FetchClusterPolicyReports()
rs, err := c.policyClient.FetchPolicyResults()
if err != nil {
return err
}
for _, clusterReport := range reports {
for _, result := range clusterReport.Results {
mx.Lock()
results = append(results, result)
mx.Unlock()
}
}
mx.Lock()
results = append(results, rs...)
mx.Unlock()
return nil
})
g.Go(func() error {
reports, err := c.FetchPolicyReports()
rs, err := c.clusterPolicyClient.FetchPolicyResults()
if err != nil {
return err
}
for _, clusterReport := range reports {
for _, result := range clusterReport.Results {
mx.Lock()
results = append(results, result)
mx.Unlock()
}
}
mx.Lock()
results = append(results, rs...)
mx.Unlock()
return nil
})
@ -111,178 +47,20 @@ func (c *policyReportClient) FetchPolicyReportResults() ([]report.Result, error)
return results, g.Wait()
}
func (c *policyReportClient) WatchClusterPolicyReports(cb report.WatchClusterPolicyReportCallback) error {
for {
result, err := c.client.Resource(clusterPolicyReports).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}
for result := range result.ResultChan() {
if item, ok := result.Object.(*unstructured.Unstructured); ok {
cb(result.Type, c.mapper.MapClusterPolicyReport(item.Object))
}
}
}
func (c *resultClient) RegisterPolicyResultWatcher(skipExisting bool) {
c.policyClient.RegisterPolicyResultWatcher(skipExisting)
c.clusterPolicyClient.RegisterPolicyResultWatcher(skipExisting)
}
func (c *policyReportClient) WatchPolicyReports(cb report.WatchPolicyReportCallback) error {
for {
result, err := c.client.Resource(policyReports).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
return err
}
for result := range result.ResultChan() {
if item, ok := result.Object.(*unstructured.Unstructured); ok {
cb(result.Type, c.mapper.MapPolicyReport(item.Object))
}
}
}
func (c *resultClient) RegisterPolicyResultCallback(cb report.PolicyResultCallback) {
c.policyClient.RegisterPolicyResultCallback(cb)
c.clusterPolicyClient.RegisterPolicyResultCallback(cb)
}
func (c *policyReportClient) WatchPolicyReportResults(cb report.WatchPolicyResultCallback, skipExisting bool) error {
wg := new(errgroup.Group)
wg.Go(func() error {
return c.WatchPolicyReports(func(e watch.EventType, pr report.PolicyReport) {
switch e {
case watch.Added:
preExisted := pr.CreationTimestamp.Before(c.startUp)
if skipExisting && preExisted {
c.policyCache[pr.GetIdentifier()] = pr
break
}
for _, result := range pr.Results {
cb(result, preExisted)
}
c.policyCache[pr.GetIdentifier()] = pr
case watch.Modified:
diff := pr.GetNewResults(c.policyCache[pr.GetIdentifier()])
for _, result := range diff {
cb(result, false)
}
c.policyCache[pr.GetIdentifier()] = pr
case watch.Deleted:
delete(c.policyCache, pr.GetIdentifier())
}
})
})
wg.Go(func() error {
return c.WatchClusterPolicyReports(func(s watch.EventType, cpr report.ClusterPolicyReport) {
switch s {
case watch.Added:
preExisted := cpr.CreationTimestamp.Before(c.startUp)
if skipExisting && preExisted {
c.clusterPolicyCache[cpr.GetIdentifier()] = cpr
break
}
for _, result := range cpr.Results {
cb(result, preExisted)
}
c.clusterPolicyCache[cpr.GetIdentifier()] = cpr
case watch.Modified:
diff := cpr.GetNewResults(c.clusterPolicyCache[cpr.GetIdentifier()])
for _, result := range diff {
cb(result, false)
}
c.clusterPolicyCache[cpr.GetIdentifier()] = cpr
case watch.Deleted:
delete(c.clusterPolicyCache, cpr.GetIdentifier())
}
})
})
return wg.Wait()
}
func (c *policyReportClient) fetchPriorities(ctx context.Context) error {
cm, err := c.coreClient.GetConfig(ctx, prioriyConfig)
if err != nil {
return err
}
if cm != nil {
c.mapper.SetPriorityMap(cm.Data)
log.Println("[INFO] Priorities loaded")
}
return nil
}
func (c *policyReportClient) syncPriorities(ctx context.Context) error {
err := c.coreClient.WatchConfigs(ctx, func(e watch.EventType, cm *v1.ConfigMap) {
if cm.Name != prioriyConfig {
return
}
switch e {
case watch.Added:
c.mapper.SetPriorityMap(cm.Data)
case watch.Modified:
c.mapper.SetPriorityMap(cm.Data)
case watch.Deleted:
c.mapper.SetPriorityMap(map[string]string{})
}
log.Println("[INFO] Priorities synchronized")
})
if err != nil {
log.Printf("[INFO] Unable to sync Priorities: %s", err.Error())
}
return err
}
// NewPolicyReportClient creates a new ReportClient based on the kubernetes go-client
func NewPolicyReportClient(ctx context.Context, kubeconfig, namespace string, startUp time.Time) (report.Client, error) {
var config *rest.Config
var err error
if kubeconfig != "" {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
} else {
config, err = rest.InClusterConfig()
}
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
coreClient, err := NewCoreClient(kubeconfig, namespace)
if err != nil {
return nil, err
}
reportClient := &policyReportClient{
client: dynamicClient,
coreClient: coreClient,
policyCache: make(map[string]report.PolicyReport),
clusterPolicyCache: make(map[string]report.ClusterPolicyReport),
mapper: NewMapper(make(map[string]string)),
startUp: startUp,
}
err = reportClient.fetchPriorities(ctx)
if err != nil {
log.Printf("[INFO] No PriorityConfig found: %s", err.Error())
}
go reportClient.syncPriorities(ctx)
return reportClient, nil
// NewPolicyResultClient creates a new ReportClient based on the kubernetes go-client
func NewPolicyResultClient(policyClient report.PolicyClient, clusterPolicyClient report.ClusterPolicyClient) report.ResultClient {
return &resultClient{
policyClient,
clusterPolicyClient,
}
}

View file

@ -0,0 +1,160 @@
package kubernetes_test
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/kubernetes"
"github.com/fjogeleit/policy-reporter/pkg/report"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/watch"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
type fakeClient struct {
policies []unstructured.Unstructured
clusterPolicies []unstructured.Unstructured
policyWatcher *watch.FakeWatcher
clusterPolicyWatcher *watch.FakeWatcher
policyError error
clusterPolicyError error
}
func (f *fakeClient) ListClusterPolicyReports() (*unstructured.UnstructuredList, error) {
return &unstructured.UnstructuredList{
Items: f.clusterPolicies,
}, f.clusterPolicyError
}
func (f *fakeClient) ListPolicyReports() (*unstructured.UnstructuredList, error) {
return &unstructured.UnstructuredList{
Items: f.policies,
}, f.policyError
}
func (f *fakeClient) WatchClusterPolicyReports() (watch.Interface, error) {
return f.clusterPolicyWatcher, f.clusterPolicyError
}
func (f *fakeClient) WatchPolicyReports() (watch.Interface, error) {
return f.policyWatcher, f.policyError
}
func NewPolicyReportAdapter() *fakeClient {
return &fakeClient{
policies: make([]unstructured.Unstructured, 0),
clusterPolicies: make([]unstructured.Unstructured, 0),
policyWatcher: watch.NewFake(),
clusterPolicyWatcher: watch.NewFake(),
}
}
func NewMapper(k8sCMClient v1.ConfigMapInterface) kubernetes.Mapper {
return kubernetes.NewMapper(make(map[string]string), kubernetes.NewConfigMapAdapter(k8sCMClient))
}
func Test_ResultClient_FetchPolicyResults(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
mapper := NewMapper(k8sCMClient)
client := kubernetes.NewPolicyResultClient(
kubernetes.NewPolicyReportClient(fakeAdapter, mapper, time.Now()),
kubernetes.NewClusterPolicyReportClient(fakeAdapter, mapper, time.Now()),
)
fakeAdapter.policies = append(fakeAdapter.policies, unstructured.Unstructured{Object: policyMap})
fakeAdapter.clusterPolicies = append(fakeAdapter.clusterPolicies, unstructured.Unstructured{Object: clusterPolicyMap})
results, err := client.FetchPolicyResults()
if err != nil {
t.Fatalf("Unexpected Error: %s", err)
}
if len(results) != 3 {
t.Fatalf("Expected 3 Results, got %d", len(results))
}
}
func Test_ResultClient_FetchPolicyResultsPolicyReportError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.policyError = errors.New("")
mapper := NewMapper(k8sCMClient)
client := kubernetes.NewPolicyResultClient(
kubernetes.NewPolicyReportClient(fakeAdapter, mapper, time.Now()),
kubernetes.NewClusterPolicyReportClient(fakeAdapter, mapper, time.Now()),
)
_, err := client.FetchPolicyResults()
if err == nil {
t.Error("PolicyFetch Error should be returned by FetchPolicyResults")
}
}
func Test_ResultClient_FetchPolicyResultsClusterPolicyReportError(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
fakeAdapter.clusterPolicyError = errors.New("")
mapper := NewMapper(k8sCMClient)
client := kubernetes.NewPolicyResultClient(
kubernetes.NewPolicyReportClient(fakeAdapter, mapper, time.Now()),
kubernetes.NewClusterPolicyReportClient(fakeAdapter, mapper, time.Now()),
)
_, err := client.FetchPolicyResults()
if err == nil {
t.Error("ClusterPolicyFetch Error should be returned by FetchPolicyResults")
}
}
func Test_ResultClient_RegisterPolicyResultWatcher(t *testing.T) {
_, k8sCMClient := newFakeAPI()
k8sCMClient.Create(context.Background(), configMap, metav1.CreateOptions{})
fakeAdapter := NewPolicyReportAdapter()
mapper := NewMapper(k8sCMClient)
pClient := kubernetes.NewPolicyReportClient(fakeAdapter, mapper, time.Now())
cpClient := kubernetes.NewClusterPolicyReportClient(fakeAdapter, mapper, time.Now())
client := kubernetes.NewPolicyResultClient(pClient, cpClient)
client.RegisterPolicyResultWatcher(false)
wg := sync.WaitGroup{}
wg.Add(3)
results := make([]report.Result, 0, 3)
client.RegisterPolicyResultCallback(func(r report.Result, b bool) {
results = append(results, r)
wg.Done()
})
go pClient.StartWatching()
go cpClient.StartWatching()
fakeAdapter.clusterPolicyWatcher.Add(&unstructured.Unstructured{Object: clusterPolicyMap})
fakeAdapter.policyWatcher.Add(&unstructured.Unstructured{Object: policyMap})
wg.Wait()
if len(results) != 3 {
t.Error("Should receive 3 Result from all PolicyReports")
}
}

View file

@ -1,41 +1,14 @@
package metrics
import (
"sync"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/watch"
)
// ClusterPolicyReportMetrics creates ClusterPolicy Metrics
type ClusterPolicyReportMetrics struct {
client report.Client
cache map[string]report.ClusterPolicyReport
rwmutex *sync.RWMutex
}
func (m ClusterPolicyReportMetrics) getCachedReport(i string) report.ClusterPolicyReport {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.cache[i]
}
func (m ClusterPolicyReportMetrics) cachedReport(r report.ClusterPolicyReport) {
m.rwmutex.Lock()
m.cache[r.GetIdentifier()] = r
m.rwmutex.Unlock()
}
func (m ClusterPolicyReportMetrics) removeCachedReport(i string) {
m.rwmutex.Lock()
delete(m.cache, i)
m.rwmutex.Unlock()
}
// GenerateMetrics for ClusterPolicyReport Summaries and PolicyResults
func (m ClusterPolicyReportMetrics) GenerateMetrics() error {
// CreateClusterPolicyReportMetricsCallback for ClusterPolicy watch.Events
func CreateClusterPolicyReportMetricsCallback() report.ClusterPolicyReportCallback {
policyGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_policy_report_summary",
Help: "Summary of all ClusterPolicyReports",
@ -49,71 +22,63 @@ func (m ClusterPolicyReportMetrics) GenerateMetrics() error {
prometheus.Register(policyGauge)
prometheus.Register(ruleGauge)
return m.client.WatchClusterPolicyReports(func(e watch.EventType, r report.ClusterPolicyReport) {
go func(event watch.EventType, report report.ClusterPolicyReport) {
switch event {
case watch.Added:
updateClusterPolicyGauge(policyGauge, report)
return func(event watch.EventType, report report.ClusterPolicyReport, oldReport report.ClusterPolicyReport) {
switch event {
case watch.Added:
updateClusterPolicyGauge(policyGauge, report)
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.WithLabelValues(rule.Rule, rule.Policy, report.Name, res.Kind, res.Name, rule.Status).Set(1)
}
m.cachedReport(report)
case watch.Modified:
updateClusterPolicyGauge(policyGauge, report)
for _, rule := range m.getCachedReport(report.GetIdentifier()).Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.
WithLabelValues(
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
).
Set(1)
}
m.cachedReport(report)
case watch.Deleted:
policyGauge.DeleteLabelValues(report.Name, "Pass")
policyGauge.DeleteLabelValues(report.Name, "Fail")
policyGauge.DeleteLabelValues(report.Name, "Warn")
policyGauge.DeleteLabelValues(report.Name, "Error")
policyGauge.DeleteLabelValues(report.Name, "Skip")
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
m.removeCachedReport(report.GetIdentifier())
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.WithLabelValues(rule.Rule, rule.Policy, report.Name, res.Kind, res.Name, rule.Status).Set(1)
}
}(e, r)
})
case watch.Modified:
updateClusterPolicyGauge(policyGauge, report)
for _, rule := range oldReport.Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.
WithLabelValues(
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
).
Set(1)
}
case watch.Deleted:
policyGauge.DeleteLabelValues(report.Name, "Pass")
policyGauge.DeleteLabelValues(report.Name, "Fail")
policyGauge.DeleteLabelValues(report.Name, "Warn")
policyGauge.DeleteLabelValues(report.Name, "Error")
policyGauge.DeleteLabelValues(report.Name, "Skip")
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
}
}
}
func updateClusterPolicyGauge(policyGauge *prometheus.GaugeVec, report report.ClusterPolicyReport) {
@ -133,12 +98,3 @@ func updateClusterPolicyGauge(policyGauge *prometheus.GaugeVec, report report.Cl
WithLabelValues(report.Name, "Skip").
Set(float64(report.Summary.Skip))
}
// NewClusterPolicyMetrics creates a new ClusterPolicyReportMetrics pointer
func NewClusterPolicyMetrics(client report.Client) *ClusterPolicyReportMetrics {
return &ClusterPolicyReportMetrics{
client: client,
cache: make(map[string]report.ClusterPolicyReport),
rwmutex: new(sync.RWMutex),
}
}

View file

@ -0,0 +1,261 @@
package metrics_test
import (
"fmt"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/metrics"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"k8s.io/apimachinery/pkg/watch"
)
var cresult1 = report.Result{
Message: "validation error: Namespace label missing",
Policy: "ns-label-env-required",
Rule: "ns-label-required",
Priority: report.ErrorPriority,
Status: report.Fail,
Category: "resources",
Scored: true,
Resources: []report.Resource{
{
APIVersion: "v1",
Kind: "Namespace",
Name: "dev",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409",
},
},
}
var cresult2 = report.Result{
Message: "validation error: Namespace label missing",
Policy: "ns-label-env-required",
Rule: "ns-label-required",
Priority: report.ErrorPriority,
Status: report.Pass,
Category: "resources",
Scored: true,
Resources: []report.Resource{
{
APIVersion: "v1",
Kind: "Namespace",
Name: "stage",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188419",
},
},
}
var creport = report.ClusterPolicyReport{
Name: "cpolr-test",
Results: make(map[string]report.Result, 0),
Summary: report.Summary{},
CreationTimestamp: time.Now(),
}
func Test_ClusterPolicyReportMetricGeneration(t *testing.T) {
report1 := creport
report1.Summary = report.Summary{Pass: 1, Fail: 1}
report1.Results = map[string]report.Result{
result1.GetIdentifier(): result1,
result2.GetIdentifier(): result2,
}
report2 := creport
report2.Summary = report.Summary{Pass: 0, Fail: 1}
report2.Results = map[string]report.Result{
result1.GetIdentifier(): result1,
}
handler := metrics.CreateClusterPolicyReportMetricsCallback()
t.Run("Added Metric", func(t *testing.T) {
handler(watch.Added, report1, report.ClusterPolicyReport{})
metricFam, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
summary := findMetric(metricFam, "cluster_policy_report_summary")
if summary == nil {
t.Fatalf("Metric not found: cluster_policy_report_summary")
}
metrics := summary.GetMetric()
if err = testClusterSummaryMetricLabels(metrics[0], creport, "Error", 0); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[1], creport, "Fail", 1); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[2], creport, "Pass", 1); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[3], creport, "Skip", 0); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[4], creport, "Warn", 0); err != nil {
t.Error(err)
}
results := findMetric(metricFam, "cluster_policy_report_result")
if summary == nil {
t.Fatalf("Metric not found: cluster_policy_report_result")
}
metrics = results.GetMetric()
if err = testClusterResultMetricLabels(metrics[0], result1); err != nil {
t.Error(err)
}
if err = testClusterResultMetricLabels(metrics[1], result2); err != nil {
t.Error(err)
}
})
t.Run("Modified Metric", func(t *testing.T) {
handler(watch.Added, report1, report.ClusterPolicyReport{})
handler(watch.Modified, report2, report1)
metricFam, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
summary := findMetric(metricFam, "cluster_policy_report_summary")
if summary == nil {
t.Fatalf("Metric not found: cluster_policy_report_summary")
}
metrics := summary.GetMetric()
if err = testClusterSummaryMetricLabels(metrics[0], creport, "Error", 0); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[1], creport, "Fail", 1); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[2], creport, "Pass", 0); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[3], creport, "Skip", 0); err != nil {
t.Error(err)
}
if err = testClusterSummaryMetricLabels(metrics[4], creport, "Warn", 0); err != nil {
t.Error(err)
}
results := findMetric(metricFam, "cluster_policy_report_result")
if summary == nil {
t.Fatalf("Metric not found: cluster_policy_report_result")
}
metrics = results.GetMetric()
if len(metrics) != 1 {
t.Error("Expected one metric, the second metric should be deleted")
}
if err = testClusterResultMetricLabels(metrics[0], result1); err != nil {
t.Error(err)
}
})
t.Run("Deleted Metric", func(t *testing.T) {
handler(watch.Added, report1, report.ClusterPolicyReport{})
handler(watch.Modified, report2, report1)
handler(watch.Deleted, report2, report2)
metricFam, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
summary := findMetric(metricFam, "cluster_policy_report_summary")
if summary != nil {
t.Error("cluster_policy_report_summary should no longer exist", *summary.Name)
}
results := metricFam[0]
if *results.Name == "cluster_policy_report_result" {
t.Error("cluster_policy_report_result shoud no longer exist", *results.Name)
}
})
}
func testClusterSummaryMetricLabels(
metric *io_prometheus_client.Metric,
preport report.ClusterPolicyReport,
status string,
gauge float64,
) error {
if name := *metric.Label[0].Name; name != "name" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[0].Value; value != preport.Name {
return fmt.Errorf("Unexpected Name Label Value: %s", value)
}
if name := *metric.Label[1].Name; name != "status" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[1].Value; value != status {
return fmt.Errorf("Unexpected Status Label Value: %s", value)
}
if value := metric.Gauge.GetValue(); value != gauge {
return fmt.Errorf("Unexpected Metric Value: %v", value)
}
return nil
}
func testClusterResultMetricLabels(metric *io_prometheus_client.Metric, result report.Result) error {
if name := *metric.Label[0].Name; name != "kind" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[0].Value; value != result.Resources[0].Kind {
return fmt.Errorf("Unexpected Kind Label Value: %s", value)
}
if name := *metric.Label[1].Name; name != "name" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[1].Value; value != result.Resources[0].Name {
return fmt.Errorf("Unexpected Name Label Value: %s", value)
}
if name := *metric.Label[2].Name; name != "policy" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[2].Value; value != result.Policy {
return fmt.Errorf("Unexpected Policy Label Value: %s", value)
}
if name := *metric.Label[3].Name; name != "report" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if name := *metric.Label[4].Name; name != "rule" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[4].Value; value != result.Rule {
return fmt.Errorf("Unexpected Rule Label Value: %s", value)
}
if name := *metric.Label[5].Name; name != "status" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[5].Value; value != result.Status {
return fmt.Errorf("Unexpected Status Label Value: %s", value)
}
if value := metric.Gauge.GetValue(); value != 1 {
return fmt.Errorf("Unexpected Metric Value: %v", value)
}
return nil
}

View file

@ -1,41 +1,14 @@
package metrics
import (
"sync"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/watch"
)
// PolicyReportMetrics creates ClusterPolicy Metrics
type PolicyReportMetrics struct {
client report.Client
cache map[string]report.PolicyReport
rwmutex *sync.RWMutex
}
func (m PolicyReportMetrics) getCachedReport(i string) report.PolicyReport {
m.rwmutex.RLock()
defer m.rwmutex.RUnlock()
return m.cache[i]
}
func (m PolicyReportMetrics) cachedReport(r report.PolicyReport) {
m.rwmutex.Lock()
m.cache[r.GetIdentifier()] = r
m.rwmutex.Unlock()
}
func (m PolicyReportMetrics) removeCachedReport(i string) {
m.rwmutex.Lock()
delete(m.cache, i)
m.rwmutex.Unlock()
}
// GenerateMetrics for PolicyReport Summaries and PolicyResults
func (m PolicyReportMetrics) GenerateMetrics() error {
// CreatePolicyMetricsCallback for PolicyReport watch.Events
func CreatePolicyReportMetricsCallback() report.PolicyReportCallback {
policyGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "policy_report_summary",
Help: "Summary of all PolicyReports",
@ -49,34 +22,15 @@ func (m PolicyReportMetrics) GenerateMetrics() error {
prometheus.Register(policyGauge)
prometheus.Register(ruleGauge)
return m.client.WatchPolicyReports(func(e watch.EventType, r report.PolicyReport) {
go func(event watch.EventType, report report.PolicyReport) {
switch event {
case watch.Added:
updatePolicyGauge(policyGauge, report)
return func(event watch.EventType, report report.PolicyReport, oldReport report.PolicyReport) {
switch event {
case watch.Added:
updatePolicyGauge(policyGauge, report)
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.
WithLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
).
Set(1)
}
m.cachedReport(report)
case watch.Modified:
updatePolicyGauge(policyGauge, report)
for _, rule := range m.getCachedReport(report.GetIdentifier()).Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.
WithLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
@ -84,50 +38,61 @@ func (m PolicyReportMetrics) GenerateMetrics() error {
res.Kind,
res.Name,
rule.Status,
)
}
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.
WithLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
).
Set(1)
}
m.cachedReport(report)
case watch.Deleted:
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Pass")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Fail")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Warn")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Error")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Skip")
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
m.removeCachedReport(report.GetIdentifier())
).
Set(1)
}
}(e, r)
})
case watch.Modified:
updatePolicyGauge(policyGauge, report)
for _, rule := range oldReport.Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.
WithLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
).
Set(1)
}
case watch.Deleted:
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Pass")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Fail")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Warn")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Error")
policyGauge.DeleteLabelValues(report.Namespace, report.Name, "Skip")
for _, rule := range report.Results {
res := rule.Resources[0]
ruleGauge.DeleteLabelValues(
report.Namespace,
rule.Rule,
rule.Policy,
report.Name,
res.Kind,
res.Name,
rule.Status,
)
}
}
}
}
func updatePolicyGauge(policyGauge *prometheus.GaugeVec, report report.PolicyReport) {
@ -147,12 +112,3 @@ func updatePolicyGauge(policyGauge *prometheus.GaugeVec, report report.PolicyRep
WithLabelValues(report.Namespace, report.Name, "Skip").
Set(float64(report.Summary.Skip))
}
// NewPolicyReportMetrics creates a new PolicyReportMetrics pointer
func NewPolicyReportMetrics(client report.Client) *PolicyReportMetrics {
return &PolicyReportMetrics{
client: client,
cache: make(map[string]report.PolicyReport),
rwmutex: new(sync.RWMutex),
}
}

View file

@ -0,0 +1,287 @@
package metrics_test
import (
"fmt"
"testing"
"time"
"github.com/fjogeleit/policy-reporter/pkg/metrics"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"k8s.io/apimachinery/pkg/watch"
)
var result1 = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Status: report.Fail,
Category: "resources",
Scored: true,
Resources: []report.Resource{
{
APIVersion: "v1",
Kind: "Deployment",
Name: "nginx",
Namespace: "test",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409",
},
},
}
var result2 = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Status: report.Pass,
Category: "resources",
Scored: true,
Resources: []report.Resource{
{
APIVersion: "v1",
Kind: "Deployment",
Name: "nginx",
Namespace: "test",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188419",
},
},
}
var preport = report.PolicyReport{
Name: "polr-test",
Namespace: "test",
Results: make(map[string]report.Result, 0),
Summary: report.Summary{},
CreationTimestamp: time.Now(),
}
func Test_PolicyReportMetricGeneration(t *testing.T) {
report1 := preport
report1.Summary = report.Summary{Pass: 1, Fail: 1}
report1.Results = map[string]report.Result{
result1.GetIdentifier(): result1,
result2.GetIdentifier(): result2,
}
report2 := preport
report2.Summary = report.Summary{Pass: 0, Fail: 1}
report2.Results = map[string]report.Result{
result1.GetIdentifier(): result1,
}
handler := metrics.CreatePolicyReportMetricsCallback()
t.Run("Added Metric", func(t *testing.T) {
handler(watch.Added, report1, report.PolicyReport{})
metricFam, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
summary := findMetric(metricFam, "policy_report_summary")
if summary == nil {
t.Fatalf("Metric not found: policy_report_summary")
}
metrics := summary.GetMetric()
if err = testSummaryMetricLabels(metrics[0], preport, "Error", 0); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[1], preport, "Fail", 1); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[2], preport, "Pass", 1); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[3], preport, "Skip", 0); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[4], preport, "Warn", 0); err != nil {
t.Error(err)
}
results := findMetric(metricFam, "policy_report_result")
if summary == nil {
t.Fatalf("Metric not found: policy_report_result")
}
metrics = results.GetMetric()
if err = testResultMetricLabels(metrics[0], result1); err != nil {
t.Error(err)
}
if err = testResultMetricLabels(metrics[1], result2); err != nil {
t.Error(err)
}
})
t.Run("Modified Metric", func(t *testing.T) {
handler(watch.Added, report1, report.PolicyReport{})
handler(watch.Modified, report2, report1)
metricFam, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
summary := findMetric(metricFam, "policy_report_summary")
if summary == nil {
t.Fatalf("Metric not found: policy_report_summary")
}
metrics := summary.GetMetric()
if err = testSummaryMetricLabels(metrics[0], preport, "Error", 0); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[1], preport, "Fail", 1); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[2], preport, "Pass", 0); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[3], preport, "Skip", 0); err != nil {
t.Error(err)
}
if err = testSummaryMetricLabels(metrics[4], preport, "Warn", 0); err != nil {
t.Error(err)
}
results := findMetric(metricFam, "policy_report_result")
if summary == nil {
t.Fatalf("Metric not found: policy_report_result")
}
metrics = results.GetMetric()
if len(metrics) != 1 {
t.Error("Expected one metric, the second metric should be deleted")
}
if err = testResultMetricLabels(metrics[0], result1); err != nil {
t.Error(err)
}
})
t.Run("Deleted Metric", func(t *testing.T) {
handler(watch.Added, report1, report.PolicyReport{})
handler(watch.Modified, report2, report1)
handler(watch.Deleted, report2, report2)
metricFam, err := prometheus.DefaultGatherer.Gather()
if err != nil {
t.Errorf("Unexpected Error: %s", err)
}
summary := findMetric(metricFam, "policy_report_summary")
if summary != nil {
t.Error("policy_report_summary should no longer exist", *summary.Name)
}
results := findMetric(metricFam, "policy_report_result")
if results != nil {
t.Error("policy_report_result shoud no longer exist", *results.Name)
}
})
}
func testSummaryMetricLabels(
metric *io_prometheus_client.Metric,
preport report.PolicyReport,
status string,
gauge float64,
) error {
if name := *metric.Label[0].Name; name != "name" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[0].Value; value != preport.Name {
return fmt.Errorf("Unexpected Name Label Value: %s", value)
}
if name := *metric.Label[1].Name; name != "namespace" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[1].Value; value != preport.Namespace {
return fmt.Errorf("Unexpected Namespace Label Value: %s", value)
}
if name := *metric.Label[2].Name; name != "status" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[2].Value; value != status {
return fmt.Errorf("Unexpected Status Label Value: %s", value)
}
if value := metric.Gauge.GetValue(); value != gauge {
return fmt.Errorf("Unexpected Metric Value: %v", value)
}
return nil
}
func testResultMetricLabels(metric *io_prometheus_client.Metric, result report.Result) error {
if name := *metric.Label[0].Name; name != "kind" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[0].Value; value != result.Resources[0].Kind {
return fmt.Errorf("Unexpected Kind Label Value: %s", value)
}
if name := *metric.Label[1].Name; name != "name" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[1].Value; value != result.Resources[0].Name {
return fmt.Errorf("Unexpected Name Label Value: %s", value)
}
if name := *metric.Label[2].Name; name != "namespace" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[2].Value; value != result.Resources[0].Namespace {
return fmt.Errorf("Unexpected Namespace Label Value: %s", value)
}
if name := *metric.Label[3].Name; name != "policy" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[3].Value; value != result.Policy {
return fmt.Errorf("Unexpected Policy Label Value: %s", value)
}
if name := *metric.Label[4].Name; name != "report" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if name := *metric.Label[5].Name; name != "rule" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[5].Value; value != result.Rule {
return fmt.Errorf("Unexpected Rule Label Value: %s", value)
}
if name := *metric.Label[6].Name; name != "status" {
return fmt.Errorf("Unexpected Name Label: %s", name)
}
if value := *metric.Label[6].Value; value != result.Status {
return fmt.Errorf("Unexpected Status Label Value: %s", value)
}
if value := metric.Gauge.GetValue(); value != 1 {
return fmt.Errorf("Unexpected Metric Value: %v", value)
}
return nil
}
func findMetric(metrics []*io_prometheus_client.MetricFamily, name string) *io_prometheus_client.MetricFamily {
for _, metric := range metrics {
if *metric.Name == name {
return metric
}
}
return nil
}

View file

@ -1,28 +1,54 @@
package report
import "k8s.io/apimachinery/pkg/watch"
import (
"k8s.io/apimachinery/pkg/watch"
)
// WatchPolicyReportCallback is called whenver a new PolicyReport comes in
type WatchPolicyReportCallback = func(watch.EventType, PolicyReport)
// PolicyReportCallback is called whenver a new PolicyReport comes in
type PolicyReportCallback = func(watch.EventType, PolicyReport, PolicyReport)
// WatchClusterPolicyReportCallback is called whenver a new ClusterPolicyReport comes in
type WatchClusterPolicyReportCallback = func(watch.EventType, ClusterPolicyReport)
// ClusterPolicyReportCallback is called whenver a new ClusterPolicyReport comes in
type ClusterPolicyReportCallback = func(watch.EventType, ClusterPolicyReport, ClusterPolicyReport)
// WatchPolicyResultCallback is called whenver a new PolicyResult comes in
type WatchPolicyResultCallback = func(Result, bool)
// PolicyResultCallback is called whenver a new PolicyResult comes in
type PolicyResultCallback = func(Result, bool)
// Client interface for interacting with the Kubernetes API
type Client interface {
type ResultClient interface {
// FetchPolicyResults from the unterlying API
FetchPolicyResults() ([]Result, error)
// RegisterPolicyReportCallback register a handler for ClusterPolicyReports and PolicyReports who call the registered PolicyResultCallbacks
RegisterPolicyResultWatcher(skipExisting bool)
// RegisterPolicyResultCallback register Handlers called on each PolicyReport- and ClusterPolicyReport watch.Event for each changed PolicyResult
RegisterPolicyResultCallback(cb PolicyResultCallback)
}
type PolicyClient interface {
// RegisterPolicyReportCallback register Handlers called on each PolicyReport watch.Event
RegisterCallback(PolicyReportCallback)
// RegisterPolicyReportCallback register Handlers called on each PolicyReport watch.Event for each changed PolicyResult
RegisterPolicyResultCallback(PolicyResultCallback)
// FetchPolicyReports from the unterlying API
FetchPolicyReports() ([]PolicyReport, error)
// WatchPolicyReports blocking API to watch for PolicyReport changes
WatchPolicyReports(WatchPolicyReportCallback) error
// WatchPolicyReportResults blocking API to watch for PolicyResult changes from PolicyReports and ClusterPolicyReports
WatchPolicyReportResults(WatchPolicyResultCallback, bool) error
// FetchPolicyReportResults from the unterlying API
FetchPolicyReportResults() ([]Result, error)
// WatchClusterPolicyReports blocking API to watch for ClusterPolicyReport changes
WatchClusterPolicyReports(WatchClusterPolicyReportCallback) error
// FetchClusterPolicyReport from the unterlying API
FetchClusterPolicyReports() ([]ClusterPolicyReport, error)
// FetchPolicyResults from the unterlying PolicyAPI
FetchPolicyResults() ([]Result, error)
// RegisterPolicyReportCallback register a handler for ClusterPolicyReports and PolicyReports who call the registered PolicyResultCallbacks
RegisterPolicyResultWatcher(skipExisting bool)
// StartWatching calls the WatchAPI, waiting for incoming PolicyReport watch.Events and call the registered Handlers
StartWatching() error
}
type ClusterPolicyClient interface {
// RegisterClusterPolicyReportCallback register Handlers called on each ClusterPolicyReport watch.Event
RegisterCallback(ClusterPolicyReportCallback)
// RegisterPolicyReportCallback register Handlers called on each ClusterPolicyReport watch.Event for each changed PolicyResult
RegisterPolicyResultCallback(PolicyResultCallback)
// FetchClusterPolicyReports from the unterlying API
FetchClusterPolicyReports() ([]ClusterPolicyReport, error)
// FetchPolicyResults from the unterlying ClusterPolicyAPI
FetchPolicyResults() ([]Result, error)
// RegisterPolicyReportCallback register a handler for ClusterPolicyReports and PolicyReports who call the registered PolicyResultCallbacks
RegisterPolicyResultWatcher(skipExisting bool)
// StartWatchPolicyReports calls the WatchAPI, waiting for incoming PolicyReport watch.Events and call the registered Handlers
StartWatching() error
}

View file

@ -115,6 +115,13 @@ func Test_Result(t *testing.T) {
})
}
func Test_MarshalPriority(t *testing.T) {
priority := report.NewPriority("error")
if result, _ := priority.MarshalJSON(); string(result) != `"error"` {
t.Errorf("Unexpected Marshel Result: %s", result)
}
}
func Test_Priorities(t *testing.T) {
t.Run("Priority.String", func(t *testing.T) {
if prio := report.Priority(0).String(); prio != "" {

View file

@ -3,12 +3,12 @@ package discord
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/fjogeleit/policy-reporter/pkg/target/helper"
)
type payload struct {
@ -107,34 +107,20 @@ func (d *client) Send(result report.Result) {
if err := json.NewEncoder(body).Encode(payload); err != nil {
log.Printf("[ERROR] DISCORD : %v\n", err.Error())
return
}
req, err := http.NewRequest("POST", d.webhook, body)
if err != nil {
log.Printf("[ERROR] DISCORD : %v\n", err.Error())
return
}
req.Header.Add("Content-Type", "application/json; charset=utf-8")
req.Header.Add("User-Agent", "Policy-Reporter")
resp, err := d.client.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err != nil {
log.Printf("[ERROR] DISCORD PUSH failed: %s\n", err.Error())
} else if resp.StatusCode >= 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("[ERROR] DISCORD PUSH failed [%d]: %s\n", resp.StatusCode, buf.String())
} else {
log.Println("[INFO] DISCORD PUSH OK")
}
helper.HandleHTTPResponse("DISCORD", resp, err)
}
func (d *client) SkipExistingOnStartup() bool {

View file

@ -12,7 +12,7 @@ var completeResult = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Priority: report.WarningPriority,
Status: report.Fail,
Severity: report.Heigh,
Category: "resources",
@ -65,8 +65,8 @@ func Test_LokiTarget(t *testing.T) {
}
}
slack := discord.NewClient("http://hook.discord:80", "", false, testClient{callback, 200})
slack.Send(completeResult)
client := discord.NewClient("http://hook.discord:80", "", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("Send Minimal Result", func(t *testing.T) {
@ -84,7 +84,26 @@ func Test_LokiTarget(t *testing.T) {
}
}
slack := discord.NewClient("http://hook.discord:80", "", false, testClient{callback, 200})
slack.Send(minimalResult)
client := discord.NewClient("http://hook.discord:80", "", false, testClient{callback, 200})
client.Send(minimalResult)
})
t.Run("Send with ingored Priority", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := discord.NewClient("http://localhost:9200", "error", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("SkipExistingOnStartup", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := discord.NewClient("http://localhost:9200", "", true, testClient{callback, 200})
if !client.SkipExistingOnStartup() {
t.Error("Should return configured SkipExistingOnStartup")
}
})
}

View file

@ -3,13 +3,13 @@ package elasticsearch
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/fjogeleit/policy-reporter/pkg/target/helper"
)
type Rotation = string
@ -44,6 +44,7 @@ func (e *client) Send(result report.Result) {
if err := json.NewEncoder(body).Encode(result); err != nil {
log.Printf("[ERROR] ELASTICSEARCH : %v\n", err.Error())
return
}
var host string
@ -61,29 +62,14 @@ func (e *client) Send(result report.Result) {
req, err := http.NewRequest("POST", host, body)
if err != nil {
log.Printf("[ERROR] ELASTICSEARCH : %v\n", err.Error())
return
}
req.Header.Add("Content-Type", "application/json; charset=utf-8")
req.Header.Add("User-Agent", "Policy-Reporter")
resp, err := e.client.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err != nil {
log.Printf("[ERROR] ELASTICSEARCH PUSH failed: %s\n", err.Error())
} else if resp.StatusCode >= 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("[ERROR] ELASTICSEARCH PUSH failed [%d]: %s\n", resp.StatusCode, buf.String())
} else {
log.Println("[INFO] ELASTICSEARCH PUSH OK")
}
helper.HandleHTTPResponse("ELASTICSEARCH", resp, err)
}
func (e *client) SkipExistingOnStartup() bool {

View file

@ -13,7 +13,7 @@ var completeResult = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Priority: report.WarningPriority,
Status: report.Fail,
Severity: report.Heigh,
Category: "resources",
@ -58,8 +58,8 @@ func Test_ElasticsearchTarget(t *testing.T) {
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "annually", "", false, testClient{callback, 200})
loki.Send(completeResult)
client := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "annually", "", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("Send with Monthly Result", func(t *testing.T) {
callback := func(req *http.Request) {
@ -68,8 +68,8 @@ func Test_ElasticsearchTarget(t *testing.T) {
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "monthly", "", false, testClient{callback, 200})
loki.Send(completeResult)
client := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "monthly", "", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("Send with Monthly Result", func(t *testing.T) {
callback := func(req *http.Request) {
@ -78,8 +78,8 @@ func Test_ElasticsearchTarget(t *testing.T) {
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "daily", "", false, testClient{callback, 200})
loki.Send(completeResult)
client := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "daily", "", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("Send with None Result", func(t *testing.T) {
callback := func(req *http.Request) {
@ -88,7 +88,26 @@ func Test_ElasticsearchTarget(t *testing.T) {
}
}
loki := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "none", "", false, testClient{callback, 200})
loki.Send(completeResult)
client := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "none", "", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("Send with ingored Priority", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "none", "error", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("SkipExistingOnStartup", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := elasticsearch.NewClient("http://localhost:9200", "policy-reporter", "none", "", true, testClient{callback, 200})
if !client.SkipExistingOnStartup() {
t.Error("Should return configured SkipExistingOnStartup")
}
})
}

28
pkg/target/helper/http.go Normal file
View file

@ -0,0 +1,28 @@
package helper
import (
"bytes"
"fmt"
"log"
"net/http"
)
func HandleHTTPResponse(target string, resp *http.Response, err error) {
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err != nil {
log.Printf("[ERROR] %s PUSH failed: %s\n", target, err.Error())
} else if resp.StatusCode >= 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("[ERROR] %s PUSH failed [%d]: %s\n", target, resp.StatusCode, buf.String())
} else {
log.Printf("[INFO] %s PUSH OK\n", target)
}
}

View file

@ -3,7 +3,6 @@ package loki
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
@ -11,6 +10,7 @@ import (
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/fjogeleit/policy-reporter/pkg/target/helper"
)
type httpClient interface {
@ -87,34 +87,20 @@ func (l *client) Send(result report.Result) {
if err := json.NewEncoder(body).Encode(payload); err != nil {
log.Printf("[ERROR] LOKI : %v\n", err.Error())
return
}
req, err := http.NewRequest("POST", l.host, body)
if err != nil {
log.Printf("[ERROR] LOKI : %v\n", err.Error())
return
}
req.Header.Add("Content-Type", "application/json")
req.Header.Add("User-Agent", "Policy-Reporter")
resp, err := l.client.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err != nil {
log.Printf("[ERROR] LOKI PUSH failed: %s\n", err.Error())
} else if resp.StatusCode >= 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("[ERROR] LOKI PUSH failed [%d]: %s\n", resp.StatusCode, buf.String())
} else {
log.Println("[INFO] LOKI PUSH OK")
}
helper.HandleHTTPResponse("LOKI", resp, err)
}
func (l *client) SkipExistingOnStartup() bool {

View file

@ -15,7 +15,7 @@ var completeResult = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Priority: report.WarningPriority,
Status: report.Fail,
Severity: report.Heigh,
Category: "resources",
@ -170,6 +170,26 @@ func Test_LokiTarget(t *testing.T) {
loki := loki.NewClient("http://localhost:3100", "", false, testClient{callback, 200})
loki.Send(minimalResult)
})
t.Run("Send with ingored Priority", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := loki.NewClient("http://localhost:9200", "error", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("SkipExistingOnStartup", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := loki.NewClient("http://localhost:9200", "", true, testClient{callback, 200})
if !client.SkipExistingOnStartup() {
t.Error("Should return configured SkipExistingOnStartup")
}
})
}
func convertAndValidateBody(req *http.Request, t *testing.T) (string, string) {

View file

@ -3,12 +3,12 @@ package slack
import (
"bytes"
"encoding/json"
"fmt"
"log"
"net/http"
"github.com/fjogeleit/policy-reporter/pkg/report"
"github.com/fjogeleit/policy-reporter/pkg/target"
"github.com/fjogeleit/policy-reporter/pkg/target/helper"
)
type httpClient interface {
@ -160,34 +160,20 @@ func (s *client) Send(result report.Result) {
if err := json.NewEncoder(body).Encode(payload); err != nil {
log.Printf("[ERROR] SLACK : %v\n", err.Error())
return
}
req, err := http.NewRequest("POST", s.webhook, body)
if err != nil {
log.Printf("[ERROR] SLACK : %v\n", err.Error())
return
}
req.Header.Add("Content-Type", "application/json; charset=utf-8")
req.Header.Add("User-Agent", "Policy-Reporter")
resp, err := s.client.Do(req)
defer func() {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
}()
if err != nil {
log.Printf("[ERROR] SLACK PUSH failed: %s\n", err.Error())
} else if resp.StatusCode >= 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("[ERROR] PUSH failed [%d]: %s\n", resp.StatusCode, buf.String())
} else {
log.Println("[INFO] SLACK PUSH OK")
}
helper.HandleHTTPResponse("SLACK", resp, err)
}
func (s *client) SkipExistingOnStartup() bool {

View file

@ -12,7 +12,7 @@ var completeResult = report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.ErrorPriority,
Priority: report.WarningPriority,
Status: report.Fail,
Severity: report.Heigh,
Category: "resources",
@ -65,8 +65,8 @@ func Test_LokiTarget(t *testing.T) {
}
}
slack := slack.NewClient("http://hook.slack:80", "", false, testClient{callback, 200})
slack.Send(completeResult)
client := slack.NewClient("http://hook.slack:80", "", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("Send Minimal Result", func(t *testing.T) {
@ -84,7 +84,26 @@ func Test_LokiTarget(t *testing.T) {
}
}
slack := slack.NewClient("http://hook.slack:80", "", false, testClient{callback, 200})
slack.Send(minimalResult)
client := slack.NewClient("http://hook.slack:80", "", false, testClient{callback, 200})
client.Send(minimalResult)
})
t.Run("Send with ingored Priority", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := slack.NewClient("http://localhost:9200", "error", false, testClient{callback, 200})
client.Send(completeResult)
})
t.Run("SkipExistingOnStartup", func(t *testing.T) {
callback := func(req *http.Request) {
t.Errorf("Unexpected Call")
}
client := slack.NewClient("http://localhost:9200", "", true, testClient{callback, 200})
if !client.SkipExistingOnStartup() {
t.Error("Should return configured SkipExistingOnStartup")
}
})
}