diff --git a/CHANGELOG.md b/CHANGELOG.md index 04c5a9e2..9506a0cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +# 2.9.4 +* Policy Reporter + * Add [AWS Kinesis](https://aws.amazon.com/kinesis) compatible target + # 2.9.3 * Policy Reporter * Fix `grafana.dashboards.value` type conversion [[fix #158](https://github.com/kyverno/policy-reporter/issues/158)] diff --git a/charts/policy-reporter/Chart.yaml b/charts/policy-reporter/Chart.yaml index 733ac79c..bfcb0b8c 100644 --- a/charts/policy-reporter/Chart.yaml +++ b/charts/policy-reporter/Chart.yaml @@ -5,8 +5,8 @@ description: | It creates Prometheus Metrics and can send rule validation events to different targets like Loki, Elasticsearch, Slack or Discord type: application -version: 2.9.3 -appVersion: 2.6.1 +version: 2.9.4 +appVersion: 2.6.2 icon: https://github.com/kyverno/kyverno/raw/main/img/logo.png home: https://kyverno.github.io/policy-reporter diff --git a/charts/policy-reporter/config.yaml b/charts/policy-reporter/config.yaml index 35379528..25e8df3e 100644 --- a/charts/policy-reporter/config.yaml +++ b/charts/policy-reporter/config.yaml @@ -137,6 +137,27 @@ s3: {{- toYaml . | nindent 4 }} {{- end }} +kinesis: + accessKeyID: {{ .Values.target.kinesis.accessKeyID }} + secretAccessKey: {{ .Values.target.kinesis.secretAccessKey }} + region: {{ .Values.target.kinesis.region }} + endpoint: {{ .Values.target.kinesis.endpoint }} + streamName: {{ .Values.target.kinesis.streamName }} + minimumPriority: {{ .Values.target.kinesis.minimumPriority | quote }} + skipExistingOnStartup: {{ .Values.target.kinesis.skipExistingOnStartup }} + {{- with .Values.target.kinesis.sources }} + sources: + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.target.kinesis.filter }} + filter: + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.target.kinesis.channels }} + channels: + {{- toYaml . | nindent 4 }} + {{- end }} + {{- with .Values.policyPriorities }} priorityMap: {{- toYaml . | nindent 2 }} @@ -153,4 +174,4 @@ reportFilter: {{- toYaml . | nindent 6 }} {{- end }} clusterReports: - disabled: {{ .Values.reportFilter.clusterReports.disabled }} \ No newline at end of file + disabled: {{ .Values.reportFilter.clusterReports.disabled }} diff --git a/charts/policy-reporter/values.yaml b/charts/policy-reporter/values.yaml index 00d93851..88333295 100644 --- a/charts/policy-reporter/values.yaml +++ b/charts/policy-reporter/values.yaml @@ -314,6 +314,28 @@ target: # add additional s3 channels with different configurations and filters channels: [] + kinesis: + # AWS access key + accessKeyID: "" + # AWS secret access key + secretAccessKey: "" + # AWS region + region: "" + # AWS Kinesis endpoint + endpoint: "" + # AWS Kinesis stream name + streamName: "" + # minimum priority "" < info < warning < critical < error + minimumPriority: "" + # list of sources which should send to S3 + sources: [] + # Skip already existing PolicyReportResults on startup + skipExistingOnStartup: true + # filter results send by namespaces, policies and priorities + filter: {} + # add additional s3 channels with different configurations and filters + channels: [] + # Node labels for pod assignment # ref: https://kubernetes.io/docs/user-guide/node-selection/ nodeSelector: {} diff --git a/pkg/config/config.go b/pkg/config/config.go index d34f6b1d..2d4bba07 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -112,6 +112,20 @@ type S3 struct { Channels []S3 `mapstructure:"channels"` } +type Kinesis struct { + Name string `mapstructure:"name"` + AccessKeyID string `mapstructure:"accessKeyID"` + SecretAccessKey string `mapstructure:"secretAccessKey"` + Region string `mapstructure:"region"` + Endpoint string `mapstructure:"endpoint"` + StreamName string `mapstructure:"streamName"` + SkipExisting bool `mapstructure:"skipExistingOnStartup"` + MinimumPriority string `mapstructure:"minimumPriority"` + Filter TargetFilter `mapstructure:"filter"` + Sources []string `mapstructure:"sources"` + Channels []Kinesis `mapstructure:"channels"` +} + // API configuration type API struct { Port int `mapstructure:"port"` @@ -165,6 +179,7 @@ type Config struct { Discord Discord `mapstructure:"discord"` Teams Teams `mapstructure:"teams"` S3 S3 `mapstructure:"s3"` + Kinesis Kinesis `mapstructure:"kinesis"` UI UI `mapstructure:"ui"` Webhook Webhook `mapstructure:"webhook"` API API `mapstructure:"api"` diff --git a/pkg/config/resolver.go b/pkg/config/resolver.go index d40a867c..9ba7ca64 100644 --- a/pkg/config/resolver.go +++ b/pkg/config/resolver.go @@ -19,6 +19,7 @@ import ( "github.com/kyverno/policy-reporter/pkg/target" "github.com/kyverno/policy-reporter/pkg/target/discord" "github.com/kyverno/policy-reporter/pkg/target/elasticsearch" + "github.com/kyverno/policy-reporter/pkg/target/kinesis" "github.com/kyverno/policy-reporter/pkg/target/loki" "github.com/kyverno/policy-reporter/pkg/target/s3" "github.com/kyverno/policy-reporter/pkg/target/slack" @@ -292,7 +293,7 @@ func (r *Resolver) UIClient() target.Client { ) } -// TeamsClients resolver method +// S3Clients resolver method func (r *Resolver) S3Clients() []target.Client { clients := make([]target.Client, 0) if r.config.S3.Name == "" { @@ -315,6 +316,29 @@ func (r *Resolver) S3Clients() []target.Client { return clients } +// KinesisClients resolver method +func (r *Resolver) KinesisClients() []target.Client { + clients := make([]target.Client, 0) + if r.config.Kinesis.Name == "" { + r.config.Kinesis.Name = "Kinesis" + } + + if es := createKinesisClient(r.config.Kinesis, Kinesis{}); es != nil { + clients = append(clients, es) + } + for i, channel := range r.config.Kinesis.Channels { + if channel.Name == "" { + channel.Name = fmt.Sprintf("Kinesis Channel %d", i+1) + } + + if es := createKinesisClient(channel, r.config.Kinesis); es != nil { + clients = append(clients, es) + } + } + + return clients +} + // TargetClients resolver method func (r *Resolver) TargetClients() []target.Client { if len(r.targetClients) > 0 { @@ -329,6 +353,7 @@ func (r *Resolver) TargetClients() []target.Client { clients = append(clients, r.DiscordClients()...) clients = append(clients, r.TeamsClients()...) clients = append(clients, r.S3Clients()...) + clients = append(clients, r.KinesisClients()...) clients = append(clients, r.WebhookClients()...) if ui := r.UIClient(); ui != nil { @@ -631,7 +656,7 @@ func createS3Client(config S3, parent S3) target.Client { config.SkipExisting = parent.SkipExisting } - s3Client := helper.NewClient( + s3Client := helper.NewS3Client( config.AccessKeyID, config.SecretAccessKey, config.Region, @@ -650,6 +675,67 @@ func createS3Client(config S3, parent S3) target.Client { ) } +func createKinesisClient(config Kinesis, parent Kinesis) target.Client { + if config.Endpoint == "" && parent.Endpoint == "" { + return nil + } else if config.Endpoint == "" { + config.Endpoint = parent.Endpoint + } + + if config.AccessKeyID == "" && parent.AccessKeyID == "" { + log.Printf("[ERROR] %s.AccessKeyID has not been declared", config.Name) + return nil + } else if config.AccessKeyID == "" { + config.AccessKeyID = parent.AccessKeyID + } + + if config.SecretAccessKey == "" && parent.SecretAccessKey == "" { + log.Printf("[ERROR] %s.SecretAccessKey has not been declared", config.Name) + return nil + } else if config.SecretAccessKey == "" { + config.SecretAccessKey = parent.SecretAccessKey + } + + if config.Region == "" && parent.Region == "" { + log.Printf("[ERROR] %s.Region has not been declared", config.Name) + return nil + } else if config.Region == "" { + config.Region = parent.Region + } + + if config.StreamName == "" && parent.StreamName == "" { + log.Printf("[ERROR] %s.StreamName has not been declared", config.Name) + return nil + } else if config.StreamName == "" { + config.StreamName = parent.StreamName + } + + if config.MinimumPriority == "" { + config.MinimumPriority = parent.MinimumPriority + } + + if !config.SkipExisting { + config.SkipExisting = parent.SkipExisting + } + + kinesisClient := helper.NewKinesisClient( + config.AccessKeyID, + config.SecretAccessKey, + config.Region, + config.Endpoint, + config.StreamName, + ) + + log.Printf("[INFO] %s configured", config.Name) + + return kinesis.NewClient( + config.Name, + kinesisClient, + config.SkipExisting, + createTargetFilter(config.Filter, config.MinimumPriority, config.Sources), + ) +} + func createTargetFilter(filter TargetFilter, minimumPriority string, sources []string) *target.Filter { return &target.Filter{ MinimumPriority: minimumPriority, diff --git a/pkg/helper/aws.go b/pkg/helper/aws.go new file mode 100644 index 00000000..7ebae1dd --- /dev/null +++ b/pkg/helper/aws.go @@ -0,0 +1,87 @@ +package helper + +import ( + "bytes" + "io" + "log" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go/service/s3/s3manager" +) + +type AWSClient interface { + // Upload given Data the configured AWS storage + Upload(body *bytes.Buffer, key string) error +} + +type s3Client struct { + bucket string + uploader *s3manager.Uploader +} + +func (s *s3Client) Upload(body *bytes.Buffer, key string) error { + _, err := s.uploader.Upload(&s3manager.UploadInput{ + Bucket: aws.String(s.bucket), + Key: aws.String(key), + Body: body, + }) + return err +} + +// NewS3Client creates a new S3.client to send Results to S3 +func NewS3Client(accessKeyID, secretAccessKey, region, endpoint, bucket string) AWSClient { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(region), + Endpoint: aws.String(endpoint), + Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""), + }) + if err != nil { + log.Printf("[ERROR]: %v\n", "Error while creating S3 Session") + return nil + } + + return &s3Client{ + bucket, + s3manager.NewUploader(sess), + } +} + +type kinesisClient struct { + streamName string + kinesis *kinesis.Kinesis +} + +func (k *kinesisClient) Upload(body *bytes.Buffer, key string) error { + data, err := io.ReadAll(body) + if err != nil { + return err + } + + _, err = k.kinesis.PutRecord(&kinesis.PutRecordInput{ + StreamName: aws.String(k.streamName), + PartitionKey: aws.String(key), + Data: data, + }) + return err +} + +// NewKinesisClient creates a new S3.client to send Results to S3 +func NewKinesisClient(accessKeyID, secretAccessKey, region, endpoint, streamName string) AWSClient { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(region), + Endpoint: aws.String(endpoint), + Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""), + }) + if err != nil { + log.Printf("[ERROR]: %v\n", "Error while creating S3 Session") + return nil + } + + return &kinesisClient{ + streamName, + kinesis.New(sess), + } +} diff --git a/pkg/helper/s3.go b/pkg/helper/s3.go deleted file mode 100644 index 1661b0a1..00000000 --- a/pkg/helper/s3.go +++ /dev/null @@ -1,48 +0,0 @@ -package helper - -import ( - "bytes" - "log" - - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/s3/s3manager" -) - -type S3Client interface { - // Upload given Data the configured S3 storage - Upload(body *bytes.Buffer, key string) error -} - -type s3Client struct { - bucket string - uploader *s3manager.Uploader -} - -func (s *s3Client) Upload(body *bytes.Buffer, key string) error { - _, err := s.uploader.Upload(&s3manager.UploadInput{ - Bucket: aws.String(s.bucket), - Key: aws.String(key), - Body: body, - }) - return err -} - -// NewClient creates a new S3.client to send Results to S3. It doesnt' work right now -func NewClient(accessKeyID, secretAccessKey, region, endpoint, bucket string) S3Client { - sess, err := session.NewSession(&aws.Config{ - Region: aws.String(region), - Endpoint: aws.String(endpoint), - Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""), - }) - if err != nil { - log.Printf("[ERROR]: %v\n", "Error while creating S3 Session") - return nil - } - - return &s3Client{ - bucket, - s3manager.NewUploader(sess), - } -} diff --git a/pkg/target/kinesis/kinesis.go b/pkg/target/kinesis/kinesis.go new file mode 100644 index 00000000..183ee2c0 --- /dev/null +++ b/pkg/target/kinesis/kinesis.go @@ -0,0 +1,44 @@ +package kinesis + +import ( + "bytes" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/kyverno/policy-reporter/pkg/helper" + "github.com/kyverno/policy-reporter/pkg/report" + "github.com/kyverno/policy-reporter/pkg/target" +) + +type client struct { + target.BaseClient + kinesis helper.AWSClient +} + +func (c *client) Send(result *report.Result) { + body := new(bytes.Buffer) + + if err := json.NewEncoder(body).Encode(result); err != nil { + log.Printf("[ERROR] %s : %v\n", c.Name(), err.Error()) + return + } + key := fmt.Sprintf("%s-%s-%s", result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano)) + + err := c.kinesis.Upload(body, key) + if err != nil { + log.Printf("[ERROR] %s : Kinesis Upload error %v \n", c.Name(), err.Error()) + return + } + + log.Printf("[INFO] %s PUSH OK", c.Name()) +} + +// NewClient creates a new Kinesis.client to send Results to AWS Kinesis compatible source +func NewClient(name string, kinesis helper.AWSClient, skipExistingOnStartup bool, filter *target.Filter) target.Client { + return &client{ + target.NewBaseClient(name, skipExistingOnStartup, filter), + kinesis, + } +} diff --git a/pkg/target/kinesis/kinesis_test.go b/pkg/target/kinesis/kinesis_test.go new file mode 100644 index 00000000..f34e69c3 --- /dev/null +++ b/pkg/target/kinesis/kinesis_test.go @@ -0,0 +1,70 @@ +package kinesis_test + +import ( + "bytes" + "encoding/json" + "testing" + + "github.com/kyverno/policy-reporter/pkg/report" + "github.com/kyverno/policy-reporter/pkg/target" + "github.com/kyverno/policy-reporter/pkg/target/kinesis" +) + +var completeResult = &report.Result{ + Message: "validation error: requests and limits required. Rule autogen-check-for-requests-and-limits failed at path /spec/template/spec/containers/0/resources/requests/", + Policy: "require-requests-and-limits-required", + Rule: "autogen-check-for-requests-and-limits", + Priority: report.WarningPriority, + Status: report.Fail, + Severity: report.High, + Category: "resources", + Scored: true, + Resource: &report.Resource{ + APIVersion: "v1", + Kind: "Deployment", + Name: "nginx", + Namespace: "default", + UID: "536ab69f-1b3c-4bd9-9ba4-274a56188409", + }, +} + +type testClient struct { + err error + callback func(body *bytes.Buffer, key string) +} + +func (c *testClient) Upload(_ *bytes.Buffer, _ string) error { + return c.err +} + +var testCallback = func(body *bytes.Buffer, key string) {} + +func Test_KinesisTarget(t *testing.T) { + t.Run("Send", func(t *testing.T) { + callback := func(body *bytes.Buffer, key string) { + report := new(bytes.Buffer) + if err := json.NewEncoder(report).Encode(completeResult); err != nil { + t.Errorf("Failed to encode report message: %s", err) + } + + if body != report { + buf := new(bytes.Buffer) + if _, err := buf.ReadFrom(body); err != nil { + t.Errorf("Failed to read from body: %s", err) + } + + t.Errorf("Unexpected Body Content: %s", buf.String()) + } + } + + client := kinesis.NewClient("Kinesis", &testClient{nil, callback}, true, &target.Filter{}) + client.Send(completeResult) + }) + t.Run("Name", func(t *testing.T) { + client := kinesis.NewClient("Kinesis", &testClient{nil, testCallback}, false, &target.Filter{}) + + if client.Name() != "Kinesis" { + t.Errorf("Unexpected Name %s", client.Name()) + } + }) +} diff --git a/pkg/target/s3/s3.go b/pkg/target/s3/s3.go index 88b04629..91743a94 100644 --- a/pkg/target/s3/s3.go +++ b/pkg/target/s3/s3.go @@ -14,8 +14,8 @@ import ( type client struct { target.BaseClient - s3client helper.S3Client - prefix string + s3 helper.AWSClient + prefix string } func (c *client) Send(result *report.Result) { @@ -27,7 +27,7 @@ func (c *client) Send(result *report.Result) { } key := fmt.Sprintf("%s/%s/%s-%s-%s.json", c.prefix, result.Timestamp.Format("2006-01-02"), result.Policy, result.ID, result.Timestamp.Format(time.RFC3339Nano)) - err := c.s3client.Upload(body, key) + err := c.s3.Upload(body, key) if err != nil { log.Printf("[ERROR] %s : S3 Upload error %v \n", c.Name(), err.Error()) return @@ -37,10 +37,10 @@ func (c *client) Send(result *report.Result) { } // NewClient creates a new S3.client to send Results to S3. It doesnt' work right now -func NewClient(name string, s3client helper.S3Client, prefix string, skipExistingOnStartup bool, filter *target.Filter) target.Client { +func NewClient(name string, s3 helper.AWSClient, prefix string, skipExistingOnStartup bool, filter *target.Filter) target.Client { return &client{ target.NewBaseClient(name, skipExistingOnStartup, filter), - s3client, + s3, prefix, } }