well-known/server/main.go

214 lines
5.5 KiB
Go
Raw Normal View History

2023-03-11 19:58:56 +00:00
package main
import (
"context"
"flag"
"log"
2023-03-11 23:06:42 +00:00
"net/http"
2023-03-11 19:58:56 +00:00
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
2023-03-11 19:58:56 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/homedir"
klog "k8s.io/klog/v2"
"k8s.io/client-go/tools/clientcmd"
)
var (
kubeconfig string
namespace string
cmName string
id string
leaseLockName string
2023-03-11 19:58:56 +00:00
serverPort string
healthPort string
metricsPort string
requestCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wellknown_requests_total",
Help: "Total number of requests to the well-known endpoints",
},
[]string{"endpoint"},
)
errorCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "wellknown_errors_total",
Help: "Total number of errors encountered",
},
[]string{"endpoint"},
)
)
func parseFlags() {
2023-06-01 10:40:19 +00:00
klog.InitFlags(nil)
// Check for KUBECONFIG environment variable
if kubeconfigEnv := os.Getenv("KUBECONFIG"); kubeconfigEnv != "" {
kubeconfig = kubeconfigEnv
} else if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "config")
2023-03-11 19:58:56 +00:00
}
flag.StringVar(&kubeconfig, "kubeconfig", kubeconfig, "(optional) absolute path to the kubeconfig file")
2023-03-11 19:58:56 +00:00
flag.StringVar(&namespace, "namespace", "default", "namespace")
flag.StringVar(&cmName, "configmap", "well-known-generated", "")
2023-03-11 21:24:21 +00:00
flag.StringVar(&id, "id", os.Getenv("POD_NAME"), "the holder identity name")
2023-03-11 19:58:56 +00:00
flag.StringVar(&leaseLockName, "lease-lock-name", "well-known", "the lease lock resource name")
flag.StringVar(&serverPort, "server-port", "8080", "server port")
flag.StringVar(&healthPort, "health-port", "8081", "health port")
flag.StringVar(&metricsPort, "metrics-port", "9090", "metrics port")
2023-03-11 19:58:56 +00:00
flag.Parse()
if id == "" {
klog.Fatal("id is required")
}
}
func getClientset() *kubernetes.Clientset {
var config *rest.Config
var err error
// Try to use in-cluster config
config, err = rest.InClusterConfig()
2023-03-11 19:58:56 +00:00
if err != nil {
klog.Infof("Running outside of cluster, using kubeconfig from path: %s", kubeconfig)
// Use kubeconfig file for out-of-cluster setup
2023-03-11 19:58:56 +00:00
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
klog.Fatalf("Failed to load kubeconfig: %v", err)
2023-03-11 19:58:56 +00:00
}
} else {
klog.Info("Running inside a cluster, using InClusterConfig")
2023-03-11 19:58:56 +00:00
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Failed to create Kubernetes clientset: %v", err)
2023-03-11 19:58:56 +00:00
}
return clientset
}
func main() {
// Parse flags
parseFlags()
2023-03-11 19:58:56 +00:00
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// Connect to the cluster
clientset := getClientset()
wks := NewWellKnownService(clientset, namespace, cmName)
2023-03-11 19:58:56 +00:00
// Start the server
2023-03-11 23:06:42 +00:00
go func() {
klog.Infof("Starting /.well-known endpoint on :%s", serverPort)
if err := http.ListenAndServe(":"+serverPort, GetServer(wks)); err != nil {
klog.Error(err)
os.Exit(1)
}
}()
2023-03-11 23:06:42 +00:00
// Start the health server
go func() {
klog.Infof("Starting /healthz endpoint on :%s", healthPort)
if err := http.ListenAndServe(":"+healthPort, GetHealthServer()); err != nil {
2023-03-11 23:06:42 +00:00
klog.Error(err)
os.Exit(1)
}
}()
// Start the metrics server
go func() {
klog.Infof("Starting /metrics endpoint on :%s", metricsPort)
http.Handle("/metrics", promhttp.Handler())
if err := http.ListenAndServe(":"+metricsPort, nil); err != nil {
log.Fatalf("Error starting metrics server: %v", err)
}
}()
// Start the leader election code loop
2023-03-11 19:58:56 +00:00
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: namespace,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
},
2023-03-11 19:58:56 +00:00
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
klog.Infof("This instance is now the leader: %s", id)
runLeaderTasks(ctx, wks)
2023-03-11 19:58:56 +00:00
},
OnStoppedLeading: func() {
klog.Infof("Leader lost: %s", id)
2023-03-11 19:58:56 +00:00
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == id {
return
2023-03-11 19:58:56 +00:00
}
klog.Infof("New leader elected: %s", identity)
2023-03-11 19:58:56 +00:00
},
},
})
}
func runLeaderTasks(ctx context.Context, wks *WellKnownService) {
// klog.Info("Starting service discovery and ConfigMap update loop...")
// Run the discovery loop
for {
//klog.Info("Triggering service discovery and ConfigMap update...")
reg, err := wks.collectData(ctx)
if err != nil {
klog.Error(err)
return
}
if err := wks.UpdateConfigMap(ctx, reg); err != nil {
klog.Error(err)
}
// Sleep for a while before checking again
time.Sleep(10 * time.Second)
}
}