package main import ( "context" "flag" "log" "net/http" "os" "os/signal" "path/filepath" "syscall" "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/client_golang/prometheus/promhttp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/util/homedir" klog "k8s.io/klog/v2" "k8s.io/client-go/tools/clientcmd" ) var ( kubeconfig string namespace string cmName string id string leaseLockName string serverPort string healthPort string metricsPort string requestCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wellknown_requests_total", Help: "Total number of requests to the well-known endpoints", }, []string{"endpoint"}, ) errorCounter = promauto.NewCounterVec( prometheus.CounterOpts{ Name: "wellknown_errors_total", Help: "Total number of errors encountered", }, []string{"endpoint"}, ) ) func parseFlags() { klog.InitFlags(nil) // Check for KUBECONFIG environment variable if kubeconfigEnv := os.Getenv("KUBECONFIG"); kubeconfigEnv != "" { kubeconfig = kubeconfigEnv } else if home := homedir.HomeDir(); home != "" { kubeconfig = filepath.Join(home, ".kube", "config") } flag.StringVar(&kubeconfig, "kubeconfig", kubeconfig, "(optional) absolute path to the kubeconfig file") flag.StringVar(&namespace, "namespace", "default", "namespace") flag.StringVar(&cmName, "configmap", "well-known-generated", "") flag.StringVar(&id, "id", os.Getenv("POD_NAME"), "the holder identity name") flag.StringVar(&leaseLockName, "lease-lock-name", "well-known", "the lease lock resource name") flag.StringVar(&serverPort, "server-port", "8080", "server port") flag.StringVar(&healthPort, "health-port", "8081", "health port") flag.StringVar(&metricsPort, "metrics-port", "9090", "metrics port") flag.Parse() if id == "" { klog.Fatal("id is required") } } func getClientset() *kubernetes.Clientset { var config *rest.Config var err error // Try to use in-cluster config config, err = rest.InClusterConfig() if err != nil { klog.Infof("Running outside of cluster, using kubeconfig from path: %s", kubeconfig) // Use kubeconfig file for out-of-cluster setup config, err = clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { klog.Fatalf("Failed to load kubeconfig: %v", err) } } else { klog.Info("Running inside a cluster, using InClusterConfig") } clientset, err := kubernetes.NewForConfig(config) if err != nil { klog.Fatalf("Failed to create Kubernetes clientset: %v", err) } return clientset } func main() { // Parse flags parseFlags() // use a Go context so we can tell the leaderelection code when we // want to step down ctx, cancel := context.WithCancel(context.Background()) defer cancel() // listen for interrupts or the Linux SIGTERM signal and cancel // our context, which the leader election code will observe and // step down ch := make(chan os.Signal, 1) signal.Notify(ch, os.Interrupt, syscall.SIGTERM) go func() { <-ch klog.Info("Received termination, signaling shutdown") cancel() }() // Connect to the cluster clientset := getClientset() wks := NewWellKnownService(clientset, namespace, cmName) // Start the server go func() { klog.Infof("Starting /.well-known endpoint on :%s", serverPort) if err := http.ListenAndServe(":"+serverPort, GetServer(wks)); err != nil { klog.Error(err) os.Exit(1) } }() // Start the health server go func() { klog.Infof("Starting /healthz endpoint on :%s", healthPort) if err := http.ListenAndServe(":"+healthPort, GetHealthServer()); err != nil { klog.Error(err) os.Exit(1) } }() // Start the metrics server go func() { klog.Infof("Starting /metrics endpoint on :%s", metricsPort) http.Handle("/metrics", promhttp.Handler()) if err := http.ListenAndServe(":"+metricsPort, nil); err != nil { log.Fatalf("Error starting metrics server: %v", err) } }() // Start the leader election code loop leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: leaseLockName, Namespace: namespace, }, Client: clientset.CoordinationV1(), LockConfig: resourcelock.ResourceLockConfig{ Identity: id, }, }, ReleaseOnCancel: true, LeaseDuration: 60 * time.Second, RenewDeadline: 15 * time.Second, RetryPeriod: 5 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { klog.Infof("This instance is now the leader: %s", id) runLeaderTasks(ctx, wks) }, OnStoppedLeading: func() { klog.Infof("Leader lost: %s", id) os.Exit(0) }, OnNewLeader: func(identity string) { if identity == id { return } klog.Infof("New leader elected: %s", identity) }, }, }) } func runLeaderTasks(ctx context.Context, wks *WellKnownService) { // klog.Info("Starting service discovery and ConfigMap update loop...") // Run the discovery loop for { //klog.Info("Triggering service discovery and ConfigMap update...") reg, err := wks.collectData(ctx) if err != nil { klog.Error(err) return } if err := wks.UpdateConfigMap(ctx, reg); err != nil { klog.Error(err) } // Sleep for a while before checking again time.Sleep(10 * time.Second) } }