1
0
Fork 0
mirror of https://github.com/kyverno/policy-reporter.git synced 2024-12-14 11:57:32 +00:00

Improved Error handling with errorgroups

This commit is contained in:
Frank Jogeleit 2021-02-22 21:18:39 +01:00
parent 24d07e27e5
commit a9f9fcd5c9
18 changed files with 63 additions and 61 deletions

View file

@ -24,4 +24,4 @@ COPY --from=builder /app/build/policyreporter /app/policyreporter
EXPOSE 2112
ENTRYPOINT ["/app/policyreporter"]
ENTRYPOINT ["/app/policyreporter", "run"]

View file

@ -3,5 +3,5 @@ name: policy-reporter
description: K8s PolicyReporter watches for wgpolicyk8s.io/v1alpha1.PolicyReport resources. It creates Prometheus Metrics and can send rule validation events to Loki
type: application
version: 0.5.2
appVersion: 0.4.2
version: 0.6.0
appVersion: 0.5.0

View file

@ -15,4 +15,4 @@ rules:
verbs:
- get
- list
- watch
- watch

View file

@ -9,4 +9,4 @@ roleRef:
subjects:
- kind: "ServiceAccount"
name: {{ include "policyreporter.serviceAccountName" . }}
namespace: policy-reporter
namespace: policy-reporter

View file

@ -462,4 +462,4 @@ data:
"uid": "5ivy3MyGz",
"version": 2
}
{{- end }}
{{- end }}

View file

@ -26,7 +26,6 @@ spec:
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
args:
- run
- --loki={{ .Values.loki.host }}
{{- if .Values.loki.minimumPriority }}
- --loki-minimum-priority={{ .Values.loki.minimumPriority }}

View file

@ -10,4 +10,4 @@ rules:
verbs:
- get
- list
- watch
- watch

View file

@ -9,4 +9,4 @@ roleRef:
subjects:
- kind: "ServiceAccount"
name: {{ include "policyreporter.serviceAccountName" . }}
namespace: policy-reporter
namespace: policy-reporter

View file

@ -15,7 +15,7 @@ metrics:
image:
repository: fjogeleit/policy-reporter
pullPolicy: IfNotPresent
tag: 0.4.2
tag: 0.5.0
imagePullSecrets: []

View file

@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"golang.org/x/sync/errgroup"
)
type PolicySeverity = string
@ -39,32 +40,39 @@ func NewCLI() *cobra.Command {
return err
}
loki := resolver.LokiClient()
if loki != nil {
go client.WatchRuleValidation(func(r report.Result) {
go loki.Send(r)
}, c.Loki.SkipExisting)
}
policyMetrics, err := resolver.PolicyReportMetrics()
if err != nil {
return err
}
go policyMetrics.GenerateMetrics()
clusterPolicyMetrics, err := resolver.ClusterPolicyReportMetrics()
if err != nil {
return err
}
go clusterPolicyMetrics.GenerateMetrics()
loki := resolver.LokiClient()
http.Handle("/metrics", promhttp.Handler())
http.ListenAndServe(":2112", nil)
g := new(errgroup.Group)
return nil
g.Go(policyMetrics.GenerateMetrics)
g.Go(clusterPolicyMetrics.GenerateMetrics)
if loki != nil {
g.Go(func() error {
return client.WatchRuleValidation(func(r report.Result) {
go loki.Send(r)
}, c.Loki.SkipExisting)
})
}
g.Go(func() error {
http.Handle("/metrics", promhttp.Handler())
return http.ListenAndServe(":2112", nil)
})
return g.Wait()
},
}

1
go.mod
View file

@ -16,6 +16,7 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/viper v1.7.1
golang.org/x/net v0.0.0-20210220033124-5f55cee0dc0d // indirect
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20210218155724-8ebf48af031b // indirect
golang.org/x/text v0.3.5 // indirect
gopkg.in/ini.v1 v1.62.0 // indirect

1
go.sum
View file

@ -548,6 +548,7 @@ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 h1:SQFwaSi55rU7vdNs9Yr0Z324VNlrF+0wMqRXT4St8ck=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

View file

@ -4,10 +4,10 @@ import (
"context"
"errors"
"log"
"sync"
"time"
"github.com/fjogeleit/policy-reporter/pkg/report"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -36,28 +36,27 @@ type policyReportClient struct {
startUp time.Time
}
func (c *policyReportClient) FetchPolicyReports() []report.PolicyReport {
func (c *policyReportClient) FetchPolicyReports() ([]report.PolicyReport, error) {
var reports []report.PolicyReport
result, err := c.client.Resource(policyReports).List(context.Background(), metav1.ListOptions{})
if err != nil {
log.Printf("K8s List Error: %s\n", err.Error())
return reports
return reports, err
}
for _, item := range result.Items {
reports = append(reports, c.mapPolicyReport(item.Object))
}
return reports
return reports, nil
}
func (c *policyReportClient) WatchClusterPolicyReports(cb report.WatchClusterPolicyReportCallback) {
func (c *policyReportClient) WatchClusterPolicyReports(cb report.WatchClusterPolicyReportCallback) error {
for {
result, err := c.client.Resource(clusterPolicyReports).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
log.Printf("K8s Watch Error: %s\n", err.Error())
return
return err
}
for result := range result.ResultChan() {
@ -68,12 +67,11 @@ func (c *policyReportClient) WatchClusterPolicyReports(cb report.WatchClusterPol
}
}
func (c *policyReportClient) WatchPolicyReports(cb report.WatchPolicyReportCallback) {
func (c *policyReportClient) WatchPolicyReports(cb report.WatchPolicyReportCallback) error {
for {
result, err := c.client.Resource(policyReports).Watch(context.Background(), metav1.ListOptions{})
if err != nil {
log.Printf("K8s Watch Error: %s\n", err.Error())
return
return err
}
for result := range result.ResultChan() {
@ -84,12 +82,11 @@ func (c *policyReportClient) WatchPolicyReports(cb report.WatchPolicyReportCallb
}
}
func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCallback, skipExisting bool) {
wg := sync.WaitGroup{}
wg.Add(2)
func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCallback, skipExisting bool) error {
wg := new(errgroup.Group)
go func(skipExisting bool) {
c.WatchPolicyReports(func(e watch.EventType, pr report.PolicyReport) {
wg.Go(func() error {
return c.WatchPolicyReports(func(e watch.EventType, pr report.PolicyReport) {
switch e {
case watch.Added:
if skipExisting && pr.CreationTimestamp.Before(c.startUp) {
@ -113,12 +110,10 @@ func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCall
delete(c.policyCache, pr.GetIdentifier())
}
})
})
wg.Done()
}(skipExisting)
go func(skipExisting bool) {
c.WatchClusterPolicyReports(func(s watch.EventType, cpr report.ClusterPolicyReport) {
wg.Go(func() error {
return c.WatchClusterPolicyReports(func(s watch.EventType, cpr report.ClusterPolicyReport) {
switch s {
case watch.Added:
if skipExisting && cpr.CreationTimestamp.Before(c.startUp) {
@ -142,11 +137,9 @@ func (c *policyReportClient) WatchRuleValidation(cb report.WatchPolicyResultCall
delete(c.clusterPolicyCache, cpr.GetIdentifier())
}
})
})
wg.Done()
}(skipExisting)
wg.Wait()
return wg.Wait()
}
func (c *policyReportClient) fetchPriorities(ctx context.Context) error {

View file

@ -33,7 +33,7 @@ func (m ClusterPolicyReportMetrics) removeCachedReport(i string) {
m.rwmutex.Unlock()
}
func (m ClusterPolicyReportMetrics) GenerateMetrics() {
func (m ClusterPolicyReportMetrics) GenerateMetrics() error {
policyGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "cluster_policy_report_summary",
Help: "Summary of all ClusterPolicyReports",
@ -47,7 +47,7 @@ func (m ClusterPolicyReportMetrics) GenerateMetrics() {
prometheus.Register(policyGauge)
prometheus.Register(ruleGauge)
m.client.WatchClusterPolicyReports(func(e watch.EventType, r report.ClusterPolicyReport) {
return m.client.WatchClusterPolicyReports(func(e watch.EventType, r report.ClusterPolicyReport) {
go func(event watch.EventType, report report.ClusterPolicyReport) {
switch event {
case watch.Added:

View file

@ -1,5 +1,5 @@
package metrics
type Metrics interface {
GenerateMetrics()
GenerateMetrics() error
}

View file

@ -33,7 +33,7 @@ func (m PolicyReportMetrics) removeCachedReport(i string) {
m.rwmutex.Unlock()
}
func (m PolicyReportMetrics) GenerateMetrics() {
func (m PolicyReportMetrics) GenerateMetrics() error {
policyGauge := promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "policy_report_summary",
Help: "Summary of all PolicyReports",
@ -47,7 +47,7 @@ func (m PolicyReportMetrics) GenerateMetrics() {
prometheus.Register(policyGauge)
prometheus.Register(ruleGauge)
m.client.WatchPolicyReports(func(e watch.EventType, r report.PolicyReport) {
return m.client.WatchPolicyReports(func(e watch.EventType, r report.PolicyReport) {
go func(event watch.EventType, report report.PolicyReport) {
switch event {
case watch.Added:

View file

@ -7,8 +7,8 @@ type WatchClusterPolicyReportCallback = func(watch.EventType, ClusterPolicyRepor
type WatchPolicyResultCallback = func(Result)
type Client interface {
FetchPolicyReports() []PolicyReport
WatchPolicyReports(WatchPolicyReportCallback)
WatchRuleValidation(WatchPolicyResultCallback, bool)
WatchClusterPolicyReports(WatchClusterPolicyReportCallback)
FetchPolicyReports() ([]PolicyReport, error)
WatchPolicyReports(WatchPolicyReportCallback) error
WatchRuleValidation(WatchPolicyResultCallback, bool) error
WatchClusterPolicyReports(WatchClusterPolicyReportCallback) error
}

View file

@ -41,7 +41,7 @@ func newLokiPayload(result report.Result) payload {
"status=\"" + result.Status + "\"",
"policy=\"" + result.Policy + "\"",
"priority=\"" + result.Priority.String() + "\"",
"source=\"kyverno\"",
"source=\"policy-reporter\"",
}
if result.Rule != "" {
@ -99,15 +99,15 @@ func (l *Client) Send(result report.Result) {
}()
if err != nil {
log.Printf("PUSH ERROR: %s\n", err.Error())
log.Printf("[ERROR] PUSH failed: %s\n", err.Error())
} else if resp.StatusCode > 400 {
fmt.Printf("StatusCode: %d\n", resp.StatusCode)
buf := new(bytes.Buffer)
buf.ReadFrom(resp.Body)
log.Printf("PUSH ERROR [%d]: %s\n", resp.StatusCode, buf.String())
log.Printf("[ERROR] PUSH failed [%d]: %s\n", resp.StatusCode, buf.String())
} else {
log.Println("PUSH OK")
log.Println("[INFO] PUSH OK")
}
}