1
0
Fork 0
mirror of https://github.com/kyverno/policy-reporter.git synced 2024-12-14 11:57:32 +00:00

added AWS Kinesis support

Signed-off-by: Ildar Valiullin <preved.911@gmail.com>
This commit is contained in:
Ildar Valiullin 2022-06-21 22:37:16 +03:00
parent 45b37a0a3d
commit 0354a5eb4c
11 changed files with 359 additions and 58 deletions

View file

@ -1,5 +1,9 @@
# Changelog
# 2.9.4
* Policy Reporter
* Add [AWS Kinesis](https://aws.amazon.com/kinesis) compatible target
# 2.9.3
* Policy Reporter
* Fix `grafana.dashboards.value` type conversion [[fix #158](https://github.com/kyverno/policy-reporter/issues/158)]

View file

@ -5,8 +5,8 @@ description: |
It creates Prometheus Metrics and can send rule validation events to different targets like Loki, Elasticsearch, Slack or Discord
type: application
version: 2.9.3
appVersion: 2.6.1
version: 2.9.4
appVersion: 2.6.2
icon: https://github.com/kyverno/kyverno/raw/main/img/logo.png
home: https://kyverno.github.io/policy-reporter

View file

@ -137,6 +137,27 @@ s3:
{{- toYaml . | nindent 4 }}
{{- end }}
kinesis:
accessKeyID: {{ .Values.target.kinesis.accessKeyID }}
secretAccessKey: {{ .Values.target.kinesis.secretAccessKey }}
region: {{ .Values.target.kinesis.region }}
endpoint: {{ .Values.target.kinesis.endpoint }}
streamName: {{ .Values.target.kinesis.streamName }}
minimumPriority: {{ .Values.target.kinesis.minimumPriority | quote }}
skipExistingOnStartup: {{ .Values.target.kinesis.skipExistingOnStartup }}
{{- with .Values.target.kinesis.sources }}
sources:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.target.kinesis.filter }}
filter:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.target.kinesis.channels }}
channels:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- with .Values.policyPriorities }}
priorityMap:
{{- toYaml . | nindent 2 }}
@ -153,4 +174,4 @@ reportFilter:
{{- toYaml . | nindent 6 }}
{{- end }}
clusterReports:
disabled: {{ .Values.reportFilter.clusterReports.disabled }}
disabled: {{ .Values.reportFilter.clusterReports.disabled }}

View file

@ -314,6 +314,28 @@ target:
# add additional s3 channels with different configurations and filters
channels: []
kinesis:
# AWS access key
accessKeyID: ""
# AWS secret access key
secretAccessKey: ""
# AWS region
region: ""
# AWS Kinesis endpoint
endpoint: ""
# AWS Kinesis stream name
streamName: ""
# minimum priority "" < info < warning < critical < error
minimumPriority: ""
# list of sources which should send to S3
sources: []
# Skip already existing PolicyReportResults on startup
skipExistingOnStartup: true
# filter results send by namespaces, policies and priorities
filter: {}
# add additional s3 channels with different configurations and filters
channels: []
# Node labels for pod assignment
# ref: https://kubernetes.io/docs/user-guide/node-selection/
nodeSelector: {}

View file

@ -112,6 +112,20 @@ type S3 struct {
Channels []S3 `mapstructure:"channels"`
}
type Kinesis struct {
Name string `mapstructure:"name"`
AccessKeyID string `mapstructure:"accessKeyID"`
SecretAccessKey string `mapstructure:"secretAccessKey"`
Region string `mapstructure:"region"`
Endpoint string `mapstructure:"endpoint"`
StreamName string `mapstructure:"streamName"`
SkipExisting bool `mapstructure:"skipExistingOnStartup"`
MinimumPriority string `mapstructure:"minimumPriority"`
Filter TargetFilter `mapstructure:"filter"`
Sources []string `mapstructure:"sources"`
Channels []Kinesis `mapstructure:"channels"`
}
// API configuration
type API struct {
Port int `mapstructure:"port"`
@ -165,6 +179,7 @@ type Config struct {
Discord Discord `mapstructure:"discord"`
Teams Teams `mapstructure:"teams"`
S3 S3 `mapstructure:"s3"`
Kinesis Kinesis `mapstructure:"kinesis"`
UI UI `mapstructure:"ui"`
Webhook Webhook `mapstructure:"webhook"`
API API `mapstructure:"api"`

View file

@ -19,6 +19,7 @@ import (
"github.com/kyverno/policy-reporter/pkg/target"
"github.com/kyverno/policy-reporter/pkg/target/discord"
"github.com/kyverno/policy-reporter/pkg/target/elasticsearch"
"github.com/kyverno/policy-reporter/pkg/target/kinesis"
"github.com/kyverno/policy-reporter/pkg/target/loki"
"github.com/kyverno/policy-reporter/pkg/target/s3"
"github.com/kyverno/policy-reporter/pkg/target/slack"
@ -292,7 +293,7 @@ func (r *Resolver) UIClient() target.Client {
)
}
// TeamsClients resolver method
// S3Clients resolver method
func (r *Resolver) S3Clients() []target.Client {
clients := make([]target.Client, 0)
if r.config.S3.Name == "" {
@ -315,6 +316,29 @@ func (r *Resolver) S3Clients() []target.Client {
return clients
}
// KinesisClients resolver method
func (r *Resolver) KinesisClients() []target.Client {
clients := make([]target.Client, 0)
if r.config.Kinesis.Name == "" {
r.config.Kinesis.Name = "Kinesis"
}
if es := createKinesisClient(r.config.Kinesis, Kinesis{}); es != nil {
clients = append(clients, es)
}
for i, channel := range r.config.Kinesis.Channels {
if channel.Name == "" {
channel.Name = fmt.Sprintf("Kinesis Channel %d", i+1)
}
if es := createKinesisClient(channel, r.config.Kinesis); es != nil {
clients = append(clients, es)
}
}
return clients
}
// TargetClients resolver method
func (r *Resolver) TargetClients() []target.Client {
if len(r.targetClients) > 0 {
@ -329,6 +353,7 @@ func (r *Resolver) TargetClients() []target.Client {
clients = append(clients, r.DiscordClients()...)
clients = append(clients, r.TeamsClients()...)
clients = append(clients, r.S3Clients()...)
clients = append(clients, r.KinesisClients()...)
clients = append(clients, r.WebhookClients()...)
if ui := r.UIClient(); ui != nil {
@ -631,7 +656,7 @@ func createS3Client(config S3, parent S3) target.Client {
config.SkipExisting = parent.SkipExisting
}
s3Client := helper.NewClient(
s3Client := helper.NewS3Client(
config.AccessKeyID,
config.SecretAccessKey,
config.Region,
@ -650,6 +675,67 @@ func createS3Client(config S3, parent S3) target.Client {
)
}
func createKinesisClient(config Kinesis, parent Kinesis) target.Client {
if config.Endpoint == "" && parent.Endpoint == "" {
return nil
} else if config.Endpoint == "" {
config.Endpoint = parent.Endpoint
}
if config.AccessKeyID == "" && parent.AccessKeyID == "" {
log.Printf("[ERROR] %s.AccessKeyID has not been declared", config.Name)
return nil
} else if config.AccessKeyID == "" {
config.AccessKeyID = parent.AccessKeyID
}
if config.SecretAccessKey == "" && parent.SecretAccessKey == "" {
log.Printf("[ERROR] %s.SecretAccessKey has not been declared", config.Name)
return nil
} else if config.SecretAccessKey == "" {
config.SecretAccessKey = parent.SecretAccessKey
}
if config.Region == "" && parent.Region == "" {
log.Printf("[ERROR] %s.Region has not been declared", config.Name)
return nil
} else if config.Region == "" {
config.Region = parent.Region
}
if config.StreamName == "" && parent.StreamName == "" {
log.Printf("[ERROR] %s.StreamName has not been declared", config.Name)
return nil
} else if config.StreamName == "" {
config.StreamName = parent.StreamName
}
if config.MinimumPriority == "" {
config.MinimumPriority = parent.MinimumPriority
}
if !config.SkipExisting {
config.SkipExisting = parent.SkipExisting
}
kinesisClient := helper.NewKinesisClient(
config.AccessKeyID,
config.SecretAccessKey,
config.Region,
config.Endpoint,
config.StreamName,
)
log.Printf("[INFO] %s configured", config.Name)
return kinesis.NewClient(
config.Name,
kinesisClient,
config.SkipExisting,
createTargetFilter(config.Filter, config.MinimumPriority, config.Sources),
)
}
func createTargetFilter(filter TargetFilter, minimumPriority string, sources []string) *target.Filter {
return &target.Filter{
MinimumPriority: minimumPriority,

87
pkg/helper/aws.go Normal file
View file

@ -0,0 +1,87 @@
package helper
import (
"bytes"
"io"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type AWSClient interface {
// Upload given Data the configured AWS storage
Upload(body *bytes.Buffer, key string) error
}
type s3Client struct {
bucket string
uploader *s3manager.Uploader
}
func (s *s3Client) Upload(body *bytes.Buffer, key string) error {
_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: body,
})
return err
}
// NewS3Client creates a new S3.client to send Results to S3
func NewS3Client(accessKeyID, secretAccessKey, region, endpoint, bucket string) AWSClient {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR]: %v\n", "Error while creating S3 Session")
return nil
}
return &s3Client{
bucket,
s3manager.NewUploader(sess),
}
}
type kinesisClient struct {
streamName string
kinesis *kinesis.Kinesis
}
func (k *kinesisClient) Upload(body *bytes.Buffer, key string) error {
data, err := io.ReadAll(body)
if err != nil {
return err
}
_, err = k.kinesis.PutRecord(&kinesis.PutRecordInput{
StreamName: aws.String(k.streamName),
PartitionKey: aws.String(key),
Data: data,
})
return err
}
// NewKinesisClient creates a new S3.client to send Results to S3
func NewKinesisClient(accessKeyID, secretAccessKey, region, endpoint, streamName string) AWSClient {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR]: %v\n", "Error while creating S3 Session")
return nil
}
return &kinesisClient{
streamName,
kinesis.New(sess),
}
}

View file

@ -1,48 +0,0 @@
package helper
import (
"bytes"
"log"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
type S3Client interface {
// Upload given Data the configured S3 storage
Upload(body *bytes.Buffer, key string) error
}
type s3Client struct {
bucket string
uploader *s3manager.Uploader
}
func (s *s3Client) Upload(body *bytes.Buffer, key string) error {
_, err := s.uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: body,
})
return err
}
// NewClient creates a new S3.client to send Results to S3. It doesnt' work right now
func NewClient(accessKeyID, secretAccessKey, region, endpoint, bucket string) S3Client {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
Endpoint: aws.String(endpoint),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR]: %v\n", "Error while creating S3 Session")
return nil
}
return &s3Client{
bucket,
s3manager.NewUploader(sess),
}
}

View file

@ -0,0 +1,44 @@
package kinesis
import (
"bytes"
"encoding/json"
"fmt"
"log"
"time"
"github.com/kyverno/policy-reporter/pkg/helper"
"github.com/kyverno/policy-reporter/pkg/report"
"github.com/kyverno/policy-reporter/pkg/target"
)
type client struct {
target.BaseClient
kinesis helper.AWSClient
}
func (c *client) Send(result *report.Result) {
body := new(bytes.Buffer)
if err := json.NewEncoder(body).Encode(result); err != nil {
log.Printf("[ERROR] %s : %v\n", c.Name(), err.Error())
return
}
key := fmt.Sprintf("%s-%s-%s", result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano))
err := c.kinesis.Upload(body, key)
if err != nil {
log.Printf("[ERROR] %s : Kinesis Upload error %v \n", c.Name(), err.Error())
return
}
log.Printf("[INFO] %s PUSH OK", c.Name())
}
// NewClient creates a new Kinesis.client to send Results to AWS Kinesis compatible source
func NewClient(name string, kinesis helper.AWSClient, skipExistingOnStartup bool, filter *target.Filter) target.Client {
return &client{
target.NewBaseClient(name, skipExistingOnStartup, filter),
kinesis,
}
}

View file

@ -0,0 +1,70 @@
package kinesis_test
import (
"bytes"
"encoding/json"
"testing"
"github.com/kyverno/policy-reporter/pkg/report"
"github.com/kyverno/policy-reporter/pkg/target"
"github.com/kyverno/policy-reporter/pkg/target/kinesis"
)
var completeResult = &report.Result{
Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/",
Policy: "require-requests-and-limits-required",
Rule: "autogen-check-for-requests-and-limits",
Priority: report.WarningPriority,
Status: report.Fail,
Severity: report.High,
Category: "resources",
Scored: true,
Resource: &report.Resource{
APIVersion: "v1",
Kind: "Deployment",
Name: "nginx",
Namespace: "default",
UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409",
},
}
type testClient struct {
err error
callback func(body *bytes.Buffer, key string)
}
func (c *testClient) Upload(_ *bytes.Buffer, _ string) error {
return c.err
}
var testCallback = func(body *bytes.Buffer, key string) {}
func Test_KinesisTarget(t *testing.T) {
t.Run("Send", func(t *testing.T) {
callback := func(body *bytes.Buffer, key string) {
report := new(bytes.Buffer)
if err := json.NewEncoder(report).Encode(completeResult); err != nil {
t.Errorf("Failed to encode report message: %s", err)
}
if body != report {
buf := new(bytes.Buffer)
if _, err := buf.ReadFrom(body); err != nil {
t.Errorf("Failed to read from body: %s", err)
}
t.Errorf("Unexpected Body Content: %s", buf.String())
}
}
client := kinesis.NewClient("Kinesis", &testClient{nil, callback}, true, &target.Filter{})
client.Send(completeResult)
})
t.Run("Name", func(t *testing.T) {
client := kinesis.NewClient("Kinesis", &testClient{nil, testCallback}, false, &target.Filter{})
if client.Name() != "Kinesis" {
t.Errorf("Unexpected Name %s", client.Name())
}
})
}

View file

@ -14,8 +14,8 @@ import (
type client struct {
target.BaseClient
s3client helper.S3Client
prefix string
s3 helper.AWSClient
prefix string
}
func (c *client) Send(result *report.Result) {
@ -27,7 +27,7 @@ func (c *client) Send(result *report.Result) {
}
key := fmt.Sprintf("%s/%s/%s-%s-%s.json", c.prefix, result.Timestamp.Format("2006-01-02"), result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano))
err := c.s3client.Upload(body, key)
err := c.s3.Upload(body, key)
if err != nil {
log.Printf("[ERROR] %s : S3 Upload error %v \n", c.Name(), err.Error())
return
@ -37,10 +37,10 @@ func (c *client) Send(result *report.Result) {
}
// NewClient creates a new S3.client to send Results to S3. It doesnt' work right now
func NewClient(name string, s3client helper.S3Client, prefix string, skipExistingOnStartup bool, filter *target.Filter) target.Client {
func NewClient(name string, s3 helper.AWSClient, prefix string, skipExistingOnStartup bool, filter *target.Filter) target.Client {
return &client{
target.NewBaseClient(name, skipExistingOnStartup, filter),
s3client,
s3,
prefix,
}
}