1
0
Fork 0
mirror of https://github.com/kyverno/policy-reporter.git synced 2024-12-14 11:57:32 +00:00

Merge pull request #161 from preved911/feature/add_aws_kinesis_target_support

Adding AWS Kinesis support
This commit is contained in:
Frank Jogeleit 2022-06-22 12:08:16 +02:00 committed by GitHub
commit b6150c30c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 359 additions and 58 deletions

View file

@ -1,5 +1,9 @@
# Changelog # Changelog
# 2.9.4
* Policy Reporter
* Add [AWS Kinesis](https://aws.amazon.com/kinesis) compatible target
# 2.9.3 # 2.9.3
* Policy Reporter * Policy Reporter
* Fix `grafana.dashboards.value` type conversion [[fix #158](https://github.com/kyverno/policy-reporter/issues/158)] * Fix `grafana.dashboards.value` type conversion [[fix #158](https://github.com/kyverno/policy-reporter/issues/158)]

View file

@ -5,8 +5,8 @@ description: |
It creates Prometheus Metrics and can send rule validation events to different targets like Loki, Elasticsearch, Slack or Discord It creates Prometheus Metrics and can send rule validation events to different targets like Loki, Elasticsearch, Slack or Discord
type: application type: application
version: 2.9.3 version: 2.9.4
appVersion: 2.6.1 appVersion: 2.6.2
icon: https://github.com/kyverno/kyverno/raw/main/img/logo.png icon: https://github.com/kyverno/kyverno/raw/main/img/logo.png
home: https://kyverno.github.io/policy-reporter home: https://kyverno.github.io/policy-reporter

View file

@ -137,6 +137,27 @@ s3:
{{- toYaml . | nindent 4 }} {{- toYaml . | nindent 4 }}
{{- end }} {{- end }}
kinesis:
accessKeyID: {{ .Values.target.kinesis.accessKeyID }}
secretAccessKey: {{ .Values.target.kinesis.secretAccessKey }}
region: {{ .Values.target.kinesis.region }}
endpoint: {{ .Values.target.kinesis.endpoint }}
streamName: {{ .Values.target.kinesis.streamName }}
minimumPriority: {{ .Values.target.kinesis.minimumPriority | quote }}
skipExistingOnStartup: {{ .Values.target.kinesis.skipExistingOnStartup }}
{{- with .Values.target.kinesis.sources }}
sources:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.target.kinesis.filter }}
filter:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.target.kinesis.channels }}
channels:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.policyPriorities }} {{- with .Values.policyPriorities }}
priorityMap: priorityMap:
{{- toYaml . | nindent 2 }} {{- toYaml . | nindent 2 }}
@ -153,4 +174,4 @@ reportFilter:
{{- toYaml . | nindent 6 }} {{- toYaml . | nindent 6 }}
{{- end }} {{- end }}
clusterReports: clusterReports:
disabled: {{ .Values.reportFilter.clusterReports.disabled }} disabled: {{ .Values.reportFilter.clusterReports.disabled }}

View file

@ -314,6 +314,28 @@ target:
# add additional s3 channels with different configurations and filters # add additional s3 channels with different configurations and filters
channels: [] channels: []
kinesis:
# AWS access key
accessKeyID: ""
# AWS secret access key
secretAccessKey: ""
# AWS region
region: ""
# AWS Kinesis endpoint
endpoint: ""
# AWS Kinesis stream name
streamName: ""
# minimum priority "" < info < warning < critical < error
minimumPriority: ""
# list of sources which should send to S3
sources: []
# Skip already existing PolicyReportResults on startup
skipExistingOnStartup: true
# filter results send by namespaces, policies and priorities
filter: {}
# add additional s3 channels with different configurations and filters
channels: []
# Node labels for pod assignment # Node labels for pod assignment
# ref: https://kubernetes.io/docs/user-guide/node-selection/ # ref: https://kubernetes.io/docs/user-guide/node-selection/
nodeSelector: {} nodeSelector: {}

View file

@ -112,6 +112,20 @@ type S3 struct {
Channels []S3 `mapstructure:"channels"` Channels []S3 `mapstructure:"channels"`
} }
type Kinesis struct {
Name string `mapstructure:"name"`
AccessKeyID string `mapstructure:"accessKeyID"`
SecretAccessKey string `mapstructure:"secretAccessKey"`
Region string `mapstructure:"region"`
Endpoint string `mapstructure:"endpoint"`
StreamName string `mapstructure:"streamName"`
SkipExisting bool `mapstructure:"skipExistingOnStartup"`
MinimumPriority string `mapstructure:"minimumPriority"`
Filter TargetFilter `mapstructure:"filter"`
Sources []string `mapstructure:"sources"`
Channels []Kinesis `mapstructure:"channels"`
}
// API configuration // API configuration
type API struct { type API struct {
Port int `mapstructure:"port"` Port int `mapstructure:"port"`
@ -165,6 +179,7 @@ type Config struct {
Discord Discord `mapstructure:"discord"` Discord Discord `mapstructure:"discord"`
Teams Teams `mapstructure:"teams"` Teams Teams `mapstructure:"teams"`
S3 S3 `mapstructure:"s3"` S3 S3 `mapstructure:"s3"`
Kinesis Kinesis `mapstructure:"kinesis"`
UI UI `mapstructure:"ui"` UI UI `mapstructure:"ui"`
Webhook Webhook `mapstructure:"webhook"` Webhook Webhook `mapstructure:"webhook"`
API API `mapstructure:"api"` API API `mapstructure:"api"`

View file

@ -19,6 +19,7 @@ import (
"github.com/kyverno/policy-reporter/pkg/target" "github.com/kyverno/policy-reporter/pkg/target"
"github.com/kyverno/policy-reporter/pkg/target/discord" "github.com/kyverno/policy-reporter/pkg/target/discord"
"github.com/kyverno/policy-reporter/pkg/target/elasticsearch" "github.com/kyverno/policy-reporter/pkg/target/elasticsearch"
"github.com/kyverno/policy-reporter/pkg/target/kinesis"
"github.com/kyverno/policy-reporter/pkg/target/loki" "github.com/kyverno/policy-reporter/pkg/target/loki"
"github.com/kyverno/policy-reporter/pkg/target/s3" "github.com/kyverno/policy-reporter/pkg/target/s3"
"github.com/kyverno/policy-reporter/pkg/target/slack" "github.com/kyverno/policy-reporter/pkg/target/slack"
@ -292,7 +293,7 @@ func (r *Resolver) UIClient() target.Client {
) )
} }
// TeamsClients resolver method // S3Clients resolver method
func (r *Resolver) S3Clients() []target.Client { func (r *Resolver) S3Clients() []target.Client {
clients := make([]target.Client, 0) clients := make([]target.Client, 0)
if r.config.S3.Name == "" { if r.config.S3.Name == "" {
@ -315,6 +316,29 @@ func (r *Resolver) S3Clients() []target.Client {
return clients return clients
} }
// KinesisClients resolver method
func (r *Resolver) KinesisClients() []target.Client {
clients := make([]target.Client, 0)
if r.config.Kinesis.Name == "" {
r.config.Kinesis.Name = "Kinesis"
}
if es := createKinesisClient(r.config.Kinesis, Kinesis{}); es != nil {
clients = append(clients, es)
}
for i, channel := range r.config.Kinesis.Channels {
if channel.Name == "" {
channel.Name = fmt.Sprintf("Kinesis Channel %d", i+1)
}
if es := createKinesisClient(channel, r.config.Kinesis); es != nil {
clients = append(clients, es)
}
}
return clients
}
// TargetClients resolver method // TargetClients resolver method
func (r *Resolver) TargetClients() []target.Client { func (r *Resolver) TargetClients() []target.Client {
if len(r.targetClients) > 0 { if len(r.targetClients) > 0 {
@ -329,6 +353,7 @@ func (r *Resolver) TargetClients() []target.Client {
clients = append(clients, r.DiscordClients()...) clients = append(clients, r.DiscordClients()...)
clients = append(clients, r.TeamsClients()...) clients = append(clients, r.TeamsClients()...)
clients = append(clients, r.S3Clients()...) clients = append(clients, r.S3Clients()...)
clients = append(clients, r.KinesisClients()...)
clients = append(clients, r.WebhookClients()...) clients = append(clients, r.WebhookClients()...)
if ui := r.UIClient(); ui != nil { if ui := r.UIClient(); ui != nil {
@ -631,7 +656,7 @@ func createS3Client(config S3, parent S3) target.Client {
config.SkipExisting = parent.SkipExisting config.SkipExisting = parent.SkipExisting
} }
s3Client := helper.NewClient( s3Client := helper.NewS3Client(
config.AccessKeyID, config.AccessKeyID,
config.SecretAccessKey, config.SecretAccessKey,
config.Region, config.Region,
@ -650,6 +675,67 @@ func createS3Client(config S3, parent S3) target.Client {
) )
} }
func createKinesisClient(config Kinesis, parent Kinesis) target.Client {
if config.Endpoint == "" && parent.Endpoint == "" {
return nil
} else if config.Endpoint == "" {
config.Endpoint = parent.Endpoint
}
if config.AccessKeyID == "" && parent.AccessKeyID == "" {
log.Printf("[ERROR] %s.AccessKeyID has not been declared", config.Name)
return nil
} else if config.AccessKeyID == "" {
config.AccessKeyID = parent.AccessKeyID
}
if config.SecretAccessKey == "" && parent.SecretAccessKey == "" {
log.Printf("[ERROR] %s.SecretAccessKey has not been declared", config.Name)
return nil
} else if config.SecretAccessKey == "" {
config.SecretAccessKey = parent.SecretAccessKey
}
if config.Region == "" && parent.Region == "" {
log.Printf("[ERROR] %s.Region has not been declared", config.Name)
return nil
} else if config.Region == "" {
config.Region = parent.Region
}
if config.StreamName == "" && parent.StreamName == "" {
log.Printf("[ERROR] %s.StreamName has not been declared", config.Name)
return nil
} else if config.StreamName == "" {
config.StreamName = parent.StreamName
}
if config.MinimumPriority == "" {
config.MinimumPriority = parent.MinimumPriority
}
if !config.SkipExisting {
config.SkipExisting = parent.SkipExisting
}
kinesisClient := helper.NewKinesisClient(
config.AccessKeyID,
config.SecretAccessKey,
config.Region,
config.Endpoint,
config.StreamName,
)
log.Printf("[INFO] %s configured", config.Name)
return kinesis.NewClient(
config.Name,
kinesisClient,
config.SkipExisting,
createTargetFilter(config.Filter, config.MinimumPriority, config.Sources),
)
}
func createTargetFilter(filter TargetFilter, minimumPriority string, sources []string) *target.Filter { func createTargetFilter(filter TargetFilter, minimumPriority string, sources []string) *target.Filter {
return &target.Filter{ return &target.Filter{
MinimumPriority: minimumPriority, MinimumPriority: minimumPriority,

87
pkg/helper/aws.go Normal file
View file

@ -0,0 +1,87 @@
package helper
import (
"bytes"
"io"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type AWSClient interface {
// Upload given Data the configured AWS storage
Upload(body *bytes.Buffer, key string) error
}
type s3Client struct {
bucket string
uploader *s3manager.Uploader
}
func (s *s3Client) Upload(body *bytes.Buffer, key string) error {
_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: body,
})
return err
}
// NewS3Client creates a new S3.client to send Results to S3
func NewS3Client(accessKeyID, secretAccessKey, region, endpoint, bucket string) AWSClient {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR]: %v\n", "Error while creating S3 Session")
return nil
}
return &s3Client{
bucket,
s3manager.NewUploader(sess),
}
}
type kinesisClient struct {
streamName string
kinesis *kinesis.Kinesis
}
func (k *kinesisClient) Upload(body *bytes.Buffer, key string) error {
data, err := io.ReadAll(body)
if err != nil {
return err
}
_, err = k.kinesis.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String(k.streamName),
PartitionKey: aws.String(key),
Data: data,
})
return err
}
// NewKinesisClient creates a new S3.client to send Results to S3
func NewKinesisClient(accessKeyID, secretAccessKey, region, endpoint, streamName string) AWSClient {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR]: %v\n", "Error while creating S3 Session")
return nil
}
return &kinesisClient{
streamName,
kinesis.New(sess),
}
}

View file

@ -1,48 +0,0 @@
package helper
import (
"bytes"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type S3Client interface {
// Upload given Data the configured S3 storage
Upload(body *bytes.Buffer, key string) error
}
type s3Client struct {
bucket string
uploader *s3manager.Uploader
}
func (s *s3Client) Upload(body *bytes.Buffer, key string) error {
_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: body,
})
return err
}
// NewClient creates a new S3.client to send Results to S3. It doesnt' work right now
func NewClient(accessKeyID, secretAccessKey, region, endpoint, bucket string) S3Client {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR]: %v\n", "Error while creating S3 Session")
return nil
}
return &s3Client{
bucket,
s3manager.NewUploader(sess),
}
}

View file

@ -0,0 +1,44 @@
package kinesis
import (
"bytes"
"encoding/json"
"fmt"
"log"
"time"
"github.com/kyverno/policy-reporter/pkg/helper"
"github.com/kyverno/policy-reporter/pkg/report"
"github.com/kyverno/policy-reporter/pkg/target"
)
type client struct {
target.BaseClient
kinesis helper.AWSClient
}
func (c *client) Send(result *report.Result) {
body := new(bytes.Buffer)
if err := json.NewEncoder(body).Encode(result); err != nil {
log.Printf("[ERROR] %s : %v\n", c.Name(), err.Error())
return
}
key := fmt.Sprintf("%s-%s-%s", result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano))
err := c.kinesis.Upload(body, key)
if err != nil {
log.Printf("[ERROR] %s : Kinesis Upload error %v \n", c.Name(), err.Error())
return
}
log.Printf("[INFO] %s PUSH OK", c.Name())
}
// NewClient creates a new Kinesis.client to send Results to AWS Kinesis compatible source
func NewClient(name string, kinesis helper.AWSClient, skipExistingOnStartup bool, filter *target.Filter) target.Client {
return &client{
target.NewBaseClient(name, skipExistingOnStartup, filter),
kinesis,
}
}

View file

@ -0,0 +1,70 @@
package kinesis_test
import (
"bytes"
"encoding/json"
"testing"
"github.com/kyverno/policy-reporter/pkg/report"
"github.com/kyverno/policy-reporter/pkg/target"
"github.com/kyverno/policy-reporter/pkg/target/kinesis"
)
var completeResult = &report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.WarningPriority,
Status: report.Fail,
Severity: report.High,
Category: "resources",
Scored: true,
Resource: &report.Resource{
APIVersion: "v1",
Kind: "Deployment",
Name: "nginx",
Namespace: "default",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409",
},
}
type testClient struct {
err error
callback func(body *bytes.Buffer, key string)
}
func (c *testClient) Upload(_ *bytes.Buffer, _ string) error {
return c.err
}
var testCallback = func(body *bytes.Buffer, key string) {}
func Test_KinesisTarget(t *testing.T) {
t.Run("Send", func(t *testing.T) {
callback := func(body *bytes.Buffer, key string) {
report := new(bytes.Buffer)
if err := json.NewEncoder(report).Encode(completeResult); err != nil {
t.Errorf("Failed to encode report message: %s", err)
}
if body != report {
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(body); err != nil {
t.Errorf("Failed to read from body: %s", err)
}
t.Errorf("Unexpected Body Content: %s", buf.String())
}
}
client := kinesis.NewClient("Kinesis", &testClient{nil, callback}, true, &target.Filter{})
client.Send(completeResult)
})
t.Run("Name", func(t *testing.T) {
client := kinesis.NewClient("Kinesis", &testClient{nil, testCallback}, false, &target.Filter{})
if client.Name() != "Kinesis" {
t.Errorf("Unexpected Name %s", client.Name())
}
})
}

View file

@ -14,8 +14,8 @@ import (
type client struct { type client struct {
target.BaseClient target.BaseClient
s3client helper.S3Client s3 helper.AWSClient
prefix string prefix string
} }
func (c *client) Send(result *report.Result) { func (c *client) Send(result *report.Result) {
@ -27,7 +27,7 @@ func (c *client) Send(result *report.Result) {
} }
key := fmt.Sprintf("%s/%s/%s-%s-%s.json", c.prefix, result.Timestamp.Format("2006-01-02"), result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano)) key := fmt.Sprintf("%s/%s/%s-%s-%s.json", c.prefix, result.Timestamp.Format("2006-01-02"), result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano))
err := c.s3client.Upload(body, key) err := c.s3.Upload(body, key)
if err != nil { if err != nil {
log.Printf("[ERROR] %s : S3 Upload error %v \n", c.Name(), err.Error()) log.Printf("[ERROR] %s : S3 Upload error %v \n", c.Name(), err.Error())
return return
@ -37,10 +37,10 @@ func (c *client) Send(result *report.Result) {
} }
// NewClient creates a new S3.client to send Results to S3. It doesnt' work right now // NewClient creates a new S3.client to send Results to S3. It doesnt' work right now
func NewClient(name string, s3client helper.S3Client, prefix string, skipExistingOnStartup bool, filter *target.Filter) target.Client { func NewClient(name string, s3 helper.AWSClient, prefix string, skipExistingOnStartup bool, filter *target.Filter) target.Client {
return &client{ return &client{
target.NewBaseClient(name, skipExistingOnStartup, filter), target.NewBaseClient(name, skipExistingOnStartup, filter),
s3client, s3,
prefix, prefix,
} }
} }