Tommy Skaug
9ab70156e7
This also fixes an issue with a copy-paste variable causing an error starting the metrics server.
213 lines
5.5 KiB
Go
213 lines
5.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/client_golang/prometheus/promhttp"
|
|
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
"k8s.io/client-go/rest"
|
|
"k8s.io/client-go/tools/leaderelection"
|
|
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
|
"k8s.io/client-go/util/homedir"
|
|
klog "k8s.io/klog/v2"
|
|
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
)
|
|
|
|
var (
|
|
kubeconfig string
|
|
namespace string
|
|
cmName string
|
|
id string
|
|
leaseLockName string
|
|
|
|
serverPort string
|
|
healthPort string
|
|
metricsPort string
|
|
|
|
requestCounter = promauto.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "wellknown_requests_total",
|
|
Help: "Total number of requests to the well-known endpoints",
|
|
},
|
|
[]string{"endpoint"},
|
|
)
|
|
errorCounter = promauto.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "wellknown_errors_total",
|
|
Help: "Total number of errors encountered",
|
|
},
|
|
[]string{"endpoint"},
|
|
)
|
|
)
|
|
|
|
func parseFlags() {
|
|
klog.InitFlags(nil)
|
|
|
|
// Check for KUBECONFIG environment variable
|
|
if kubeconfigEnv := os.Getenv("KUBECONFIG"); kubeconfigEnv != "" {
|
|
kubeconfig = kubeconfigEnv
|
|
} else if home := homedir.HomeDir(); home != "" {
|
|
kubeconfig = filepath.Join(home, ".kube", "config")
|
|
}
|
|
|
|
flag.StringVar(&kubeconfig, "kubeconfig", kubeconfig, "(optional) absolute path to the kubeconfig file")
|
|
flag.StringVar(&namespace, "namespace", "default", "namespace")
|
|
flag.StringVar(&cmName, "configmap", "well-known-generated", "")
|
|
flag.StringVar(&id, "id", os.Getenv("POD_NAME"), "the holder identity name")
|
|
flag.StringVar(&leaseLockName, "lease-lock-name", "well-known", "the lease lock resource name")
|
|
flag.StringVar(&serverPort, "server-port", "8080", "server port")
|
|
flag.StringVar(&healthPort, "health-port", "8081", "health port")
|
|
flag.StringVar(&metricsPort, "metrics-port", "9090", "metrics port")
|
|
flag.Parse()
|
|
|
|
if id == "" {
|
|
klog.Fatal("id is required")
|
|
}
|
|
}
|
|
|
|
func getClientset() *kubernetes.Clientset {
|
|
var config *rest.Config
|
|
var err error
|
|
|
|
// Try to use in-cluster config
|
|
config, err = rest.InClusterConfig()
|
|
if err != nil {
|
|
klog.Infof("Running outside of cluster, using kubeconfig from path: %s", kubeconfig)
|
|
// Use kubeconfig file for out-of-cluster setup
|
|
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
|
|
if err != nil {
|
|
klog.Fatalf("Failed to load kubeconfig: %v", err)
|
|
}
|
|
} else {
|
|
klog.Info("Running inside a cluster, using InClusterConfig")
|
|
}
|
|
|
|
clientset, err := kubernetes.NewForConfig(config)
|
|
if err != nil {
|
|
klog.Fatalf("Failed to create Kubernetes clientset: %v", err)
|
|
}
|
|
|
|
return clientset
|
|
}
|
|
|
|
func main() {
|
|
// Parse flags
|
|
parseFlags()
|
|
|
|
// use a Go context so we can tell the leaderelection code when we
|
|
// want to step down
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// listen for interrupts or the Linux SIGTERM signal and cancel
|
|
// our context, which the leader election code will observe and
|
|
// step down
|
|
ch := make(chan os.Signal, 1)
|
|
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
|
|
go func() {
|
|
<-ch
|
|
klog.Info("Received termination, signaling shutdown")
|
|
cancel()
|
|
}()
|
|
|
|
// Connect to the cluster
|
|
clientset := getClientset()
|
|
wks := NewWellKnownService(clientset, namespace, cmName)
|
|
|
|
// Start the server
|
|
go func() {
|
|
klog.Infof("Starting /.well-known endpoint on :%s", serverPort)
|
|
if err := http.ListenAndServe(":"+serverPort, GetServer(wks)); err != nil {
|
|
klog.Error(err)
|
|
os.Exit(1)
|
|
}
|
|
}()
|
|
|
|
// Start the health server
|
|
go func() {
|
|
klog.Infof("Starting /healthz endpoint on :%s", healthPort)
|
|
if err := http.ListenAndServe(":"+healthPort, GetHealthServer()); err != nil {
|
|
klog.Error(err)
|
|
os.Exit(1)
|
|
}
|
|
}()
|
|
|
|
// Start the metrics server
|
|
go func() {
|
|
klog.Infof("Starting /metrics endpoint on :%s", metricsPort)
|
|
http.Handle("/metrics", promhttp.Handler())
|
|
if err := http.ListenAndServe(":"+metricsPort, nil); err != nil {
|
|
log.Fatalf("Error starting metrics server: %v", err)
|
|
}
|
|
}()
|
|
|
|
// Start the leader election code loop
|
|
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
|
|
Lock: &resourcelock.LeaseLock{
|
|
LeaseMeta: metav1.ObjectMeta{
|
|
Name: leaseLockName,
|
|
Namespace: namespace,
|
|
},
|
|
Client: clientset.CoordinationV1(),
|
|
LockConfig: resourcelock.ResourceLockConfig{
|
|
Identity: id,
|
|
},
|
|
},
|
|
ReleaseOnCancel: true,
|
|
LeaseDuration: 60 * time.Second,
|
|
RenewDeadline: 15 * time.Second,
|
|
RetryPeriod: 5 * time.Second,
|
|
Callbacks: leaderelection.LeaderCallbacks{
|
|
OnStartedLeading: func(ctx context.Context) {
|
|
klog.Infof("This instance is now the leader: %s", id)
|
|
|
|
runLeaderTasks(ctx, wks)
|
|
},
|
|
OnStoppedLeading: func() {
|
|
klog.Infof("Leader lost: %s", id)
|
|
os.Exit(0)
|
|
},
|
|
OnNewLeader: func(identity string) {
|
|
if identity == id {
|
|
return
|
|
}
|
|
klog.Infof("New leader elected: %s", identity)
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
func runLeaderTasks(ctx context.Context, wks *WellKnownService) {
|
|
// klog.Info("Starting service discovery and ConfigMap update loop...")
|
|
|
|
// Run the discovery loop
|
|
for {
|
|
//klog.Info("Triggering service discovery and ConfigMap update...")
|
|
|
|
reg, err := wks.collectData(ctx)
|
|
if err != nil {
|
|
klog.Error(err)
|
|
return
|
|
}
|
|
|
|
if err := wks.UpdateConfigMap(ctx, reg); err != nil {
|
|
klog.Error(err)
|
|
}
|
|
|
|
// Sleep for a while before checking again
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
}
|