1
0
Fork 0
mirror of https://github.com/kyverno/kyverno.git synced 2025-01-20 18:52:16 +00:00
kyverno/cmd/kyverno-init/main.go
Charles-Edouard Brétéché 0f9fe30c08
feat: allow overriding ca and tls secret names (#8137)
* feat: allow overriding ca and tls secret names

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

* fix

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>

---------

Signed-off-by: Charles-Edouard Brétéché <charles.edouard@nirmata.com>
2023-08-28 14:05:49 +00:00

217 lines
5.3 KiB
Go

/*
Cleans up stale webhookconfigurations created by kyverno that were not cleanedup
*/
package main
import (
"context"
"os"
"sync"
"github.com/kyverno/kyverno/cmd/internal"
kyvernoclient "github.com/kyverno/kyverno/pkg/client/clientset/versioned"
"github.com/kyverno/kyverno/pkg/clients/dclient"
"github.com/kyverno/kyverno/pkg/config"
"github.com/kyverno/kyverno/pkg/leaderelection"
"github.com/kyverno/kyverno/pkg/logging"
kubeutils "github.com/kyverno/kyverno/pkg/utils/kube"
coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
const (
policyReportKind string = "PolicyReport"
clusterPolicyReportKind string = "ClusterPolicyReport"
)
func main() {
// config
appConfig := internal.NewConfiguration(
internal.WithKubeconfig(),
internal.WithKyvernoClient(),
internal.WithDynamicClient(),
internal.WithKyvernoDynamicClient(),
)
// parse flags
internal.ParseFlags(appConfig)
// setup logger
// show version
// start profiling
// setup signals
// setup maxprocs
ctx, setup, sdown := internal.Setup(appConfig, "kyverno-init-controller", false)
defer sdown()
// Exit for unsupported version of kubernetes cluster
if !kubeutils.HigherThanKubernetesVersion(setup.KubeClient.Discovery(), logging.GlobalLogger(), 1, 16, 0) {
os.Exit(1)
}
requests := []request{
{policyReportKind},
{clusterPolicyReportKind},
}
go func() {
defer sdown()
<-ctx.Done()
}()
done := make(chan struct{})
defer close(done)
failure := false
run := func(context.Context) {
if err := acquireLeader(ctx, setup.KubeClient); err != nil {
logging.V(2).Info("Failed to create lease 'kyvernopre-lock'")
os.Exit(1)
}
// use pipeline to pass request to cleanup resources
in := gen(done, ctx.Done(), requests...)
// process requests
// processing routine count : 2
p1 := process(setup.KyvernoDynamicClient, setup.KyvernoClient, done, ctx.Done(), in)
p2 := process(setup.KyvernoDynamicClient, setup.KyvernoClient, done, ctx.Done(), in)
// merge results from processing routines
for err := range merge(done, ctx.Done(), p1, p2) {
if err != nil {
failure = true
logging.Error(err, "failed to cleanup resource")
}
}
// if there is any failure then we fail process
if failure {
logging.V(2).Info("failed to cleanup prior configurations")
os.Exit(1)
}
os.Exit(0)
}
le, err := leaderelection.New(
logging.WithName("kyvernopre/LeaderElection"),
"kyvernopre",
config.KyvernoNamespace(),
setup.KubeClient,
config.KyvernoPodName(),
leaderelection.DefaultRetryPeriod,
run,
nil,
)
if err != nil {
setup.Logger.Error(err, "failed to elect a leader")
os.Exit(1)
}
le.Run(ctx)
}
func acquireLeader(ctx context.Context, kubeClient kubernetes.Interface) error {
_, err := kubeClient.CoordinationV1().Leases(config.KyvernoNamespace()).Get(ctx, "kyvernopre-lock", metav1.GetOptions{})
if err != nil {
logging.V(2).Info("Lease 'kyvernopre-lock' not found. Starting clean-up...")
} else {
logging.V(2).Info("Leader was elected, quitting")
os.Exit(0)
}
lease := coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: "kyvernopre-lock",
},
}
_, err = kubeClient.CoordinationV1().Leases(config.KyvernoNamespace()).Create(ctx, &lease, metav1.CreateOptions{})
return err
}
func executeRequest(client dclient.Interface, kyvernoclient kyvernoclient.Interface, req request) error {
return nil
}
type request struct {
kind string
}
/* Processing Pipeline
-> Process Requests
Generate Requests -> Process Requests -> Merge Results
-> Process Requests
- number of processes can be controlled
- stop processing on SIGTERM OR SIGNKILL signal
- stop processing if any process fails(supported)
*/
// Generates requests to be processed
func gen(done <-chan struct{}, stopCh <-chan struct{}, requests ...request) <-chan request {
out := make(chan request)
go func() {
defer close(out)
for _, req := range requests {
select {
case out <- req:
case <-done:
println("done generate")
return
case <-stopCh:
println("shutting down generate")
return
}
}
}()
return out
}
// processes the requests
func process(client dclient.Interface, kyvernoclient kyvernoclient.Interface, done <-chan struct{}, stopCh <-chan struct{}, requests <-chan request) <-chan error {
logger := logging.WithName("process")
out := make(chan error)
go func() {
defer close(out)
for req := range requests {
select {
case out <- executeRequest(client, kyvernoclient, req):
case <-done:
logger.Info("done")
return
case <-stopCh:
logger.Info("shutting down")
return
}
}
}()
return out
}
// waits for all processes to be complete and merges result
func merge(done <-chan struct{}, stopCh <-chan struct{}, processes ...<-chan error) <-chan error {
logger := logging.WithName("merge")
var wg sync.WaitGroup
out := make(chan error)
// gets the output from each process
output := func(ch <-chan error) {
defer wg.Done()
for err := range ch {
select {
case out <- err:
case <-done:
logger.Info("done")
return
case <-stopCh:
logger.Info("shutting down")
return
}
}
}
wg.Add(len(processes))
for _, process := range processes {
go output(process)
}
// close when all the process goroutines are done
go func() {
wg.Wait()
close(out)
}()
return out
}