well-known/server/main.go
2023-04-27 09:56:31 +02:00

223 lines
5.2 KiB
Go

package main
import (
"context"
"encoding/json"
"flag"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/homedir"
klog "k8s.io/klog/v2"
"github.com/bep/debounce"
"k8s.io/client-go/tools/clientcmd"
)
var kubeconfig string
var namespace string
var cmName string
var id string
var leaseLockName string
func main() {
if home := homedir.HomeDir(); home != "" {
flag.StringVar(&kubeconfig, "kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
flag.StringVar(&kubeconfig, "kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.StringVar(&namespace, "namespace", "default", "namespace")
flag.StringVar(&cmName, "configmap", "well-known-generated", "")
flag.StringVar(&id, "id", os.Getenv("POD_NAME"), "the holder identity name")
flag.StringVar(&leaseLockName, "lease-lock-name", "well-known", "the lease lock resource name")
flag.Parse()
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err.Error())
}
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
// use a Go context so we can tell the leaderelection code when we
// want to step down
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// listen for interrupts or the Linux SIGTERM signal and cancel
// our context, which the leader election code will observe and
// step down
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// we use the Lease lock type since edits to Leases are less common
// and fewer objects in the cluster watch "all Leases".
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: namespace,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: id,
},
}
go func() {
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
klog.Info("Running /healthz endpoint on :8081")
if err := http.ListenAndServe(":8081", nil); err != nil {
klog.Error(err)
os.Exit(1)
}
}()
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
for {
// ensure that we keep observing
loop(ctx, clientset)
}
},
OnStoppedLeading: func() {
klog.Infof("leader lost: %s", id)
os.Exit(0)
},
OnNewLeader: func(identity string) {
if identity == id {
return
}
klog.Infof("new leader elected: %s", identity)
},
},
})
}
func loop(ctx context.Context, clientset *kubernetes.Clientset) {
watch, err := clientset.
CoreV1().
Services(namespace).
Watch(ctx, metav1.ListOptions{})
if err != nil {
klog.Error(err)
os.Exit(1)
}
debounced := debounce.New(500 * time.Millisecond)
for event := range watch.ResultChan() {
svc, ok := event.Object.(*v1.Service)
if !ok {
continue
}
klog.Infof("Change detected on %s", svc.GetName())
debounced(func() {
reg, err := discoverData(clientset, namespace)
if err != nil {
klog.Error(err)
return
}
klog.Info("Writing configmap")
if err := updateConfigMap(ctx, clientset, reg); err != nil {
klog.Error(err)
}
})
}
}
func discoverData(clientset *kubernetes.Clientset, ns string) (wkRegistry, error) {
reg := make(wkRegistry, 0)
svcs, err := clientset.
CoreV1().
Services(namespace).
List(context.Background(), metav1.ListOptions{})
if err != nil {
return reg, err
}
for _, svc := range svcs.Items {
for name, value := range svc.ObjectMeta.Annotations {
name = resolveName(name)
if name == "" {
continue
}
if _, ok := reg[name]; !ok {
reg[name] = make(wkData, 0)
}
var d map[string]interface{}
err := json.Unmarshal([]byte(value), &d)
if err != nil {
klog.Error(err)
}
reg[name].append(d)
}
}
return reg, nil
}
func updateConfigMap(ctx context.Context, client kubernetes.Interface, reg wkRegistry) error {
cm := &v1.ConfigMap{Data: reg.encode()}
cm.Namespace = namespace
cm.Name = cmName
_, err := client.
CoreV1().
ConfigMaps(namespace).
Update(ctx, cm, metav1.UpdateOptions{})
if errors.IsNotFound(err) {
_, err = client.
CoreV1().
ConfigMaps(namespace).
Create(ctx, cm, metav1.CreateOptions{})
if err == nil {
klog.Infof("Created ConfigMap %s/%s\n", cm.GetNamespace(), cm.GetName())
}
return err
} else if err != nil {
klog.Error(err)
return err
}
klog.Infof("Updated ConfigMap %s/%s\n", cm.GetNamespace(), cm.GetName())
return nil
}