well-known/server/wellknown.go

271 lines
6.2 KiB
Go

package main
import (
"bytes"
"context"
"encoding/json"
"os"
"sort"
"strings"
"time"
"github.com/bep/debounce"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
klog "k8s.io/klog/v2"
)
type WellKnownService struct {
clientset *kubernetes.Clientset
namespace string
cmName string
localCache wkRegistry
}
func NewWellKnownService(clientset *kubernetes.Clientset, namespace string, cmName string) *WellKnownService {
return &WellKnownService{
clientset: clientset,
namespace: namespace,
cmName: cmName,
}
}
func (s *WellKnownService) GetData(ctx context.Context) (wkRegistry, error) {
if s.localCache != nil {
return s.localCache, nil
}
cm, err := s.clientset.CoreV1().ConfigMaps(s.namespace).Get(ctx, s.cmName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil, nil
} else if err != nil {
return nil, err
}
reg := make(wkRegistry)
for name, data := range cm.Data {
baseName := strings.TrimSuffix(name, ".json")
id := strings.ReplaceAll(baseName, "__", "/")
var d wkData
if err := json.Unmarshal([]byte(data), &d); err != nil {
klog.Error(err)
}
reg[id] = d
}
// Update the local cache
s.localCache = reg
return reg, nil
}
func (s *WellKnownService) UpdateConfigMap(ctx context.Context, reg wkRegistry) error {
// Update the local cache
s.localCache = reg
cm := &v1.ConfigMap{Data: reg.encode()}
cm.Namespace = s.namespace
cm.Name = s.cmName
_, err := s.clientset.
CoreV1().
ConfigMaps(s.namespace).
Update(ctx, cm, metav1.UpdateOptions{})
if errors.IsNotFound(err) {
_, err = s.clientset.
CoreV1().
ConfigMaps(s.namespace).
Create(ctx, cm, metav1.CreateOptions{})
if err == nil {
klog.Infof("Created ConfigMap %s/%s\n", cm.GetNamespace(), cm.GetName())
}
return err
} else if err != nil {
klog.Error(err)
return err
}
return nil
}
// stableMarshal ensures the keys in maps are always sorted before encoding to JSON.
func stableMarshal(v interface{}) ([]byte, error) {
buffer := &bytes.Buffer{}
encoder := json.NewEncoder(buffer)
encoder.SetEscapeHTML(false)
encoder.SetIndent("", " ")
// Use a custom map sorter for stability
if err := encodeSorted(encoder, v); err != nil {
return nil, err
}
return buffer.Bytes(), nil
}
// encodeSorted is a helper function to sort keys before encoding map.
func encodeSorted(encoder *json.Encoder, v interface{}) error {
switch v := v.(type) {
case map[string]interface{}:
// Sort the keys in the map
keys := make([]string, 0, len(v))
for key := range v {
keys = append(keys, key)
}
sort.Strings(keys)
// Create a new map in sorted order
sortedMap := make(map[string]interface{})
for _, key := range keys {
sortedMap[key] = v[key]
}
return encoder.Encode(sortedMap)
case map[string]string:
// Sort the keys in the map
keys := make([]string, 0, len(v))
for key := range v {
keys = append(keys, key)
}
sort.Strings(keys)
// Create a new map in sorted order
sortedMap := make(map[string]string)
for _, key := range keys {
sortedMap[key] = v[key]
}
return encoder.Encode(sortedMap)
default:
// Use default encoding for non-map types
return encoder.Encode(v)
}
}
func (s *WellKnownService) DiscoveryLoop(ctx context.Context) {
// Watch for service changes in the namespace
watch, err := s.clientset.
CoreV1().
Services(s.namespace).
Watch(ctx, metav1.ListOptions{})
if err != nil {
klog.Error(err)
os.Exit(1)
}
klog.Infof("Watching for service changes in namespace: %s", s.namespace)
debounced := debounce.New(15000 * time.Millisecond)
hash := []byte{}
for event := range watch.ResultChan() {
svc, ok := event.Object.(*v1.Service)
if !ok {
klog.Warning("Unexpected object type")
continue
}
klog.Infof("Change detected on service: %s", svc.GetName())
debounced(func() {
klog.Info("Processing service change...")
reg, err := s.collectData(ctx)
if err != nil {
klog.Error(err)
return
}
// Generate a stable JSON representation of the registry
newHash, err := stableMarshal(reg)
if err != nil {
klog.Error(err)
return
}
// Compare with the previous hash
if bytes.Equal(hash, newHash) {
klog.V(1).Info("No changes detected, skipping update")
return
}
hash = newHash
klog.Infof("Writing configmap for discovered services...")
if err := s.UpdateConfigMap(ctx, reg); err != nil {
klog.Error(err)
}
})
}
}
func (s *WellKnownService) collectData(ctx context.Context) (wkRegistry, error) {
reg := make(wkRegistry)
svcs, err := s.clientset.CoreV1().Services(s.namespace).List(ctx, metav1.ListOptions{})
if err != nil {
return reg, err
}
for _, svc := range svcs.Items {
//klog.Infof("Processing service: %s", svc.GetName())
// Check if the service has a format lock (e.g., JSON only)
format := "default"
if fmtAnnotation, ok := svc.Annotations["well-known.252.no/format"]; ok {
format = fmtAnnotation
}
directory := ""
if dirAnnotation, ok := svc.Annotations["well-known.252.no/directory"]; ok {
directory = dirAnnotation
}
for key, value := range svc.Annotations {
resolvedName := resolveName(key)
if resolvedName == "" || resolvedName == "directory" || resolvedName == "format" {
continue
}
isPlainText := strings.HasSuffix(resolvedName, ".txt")
if isPlainText {
format = "text"
} else if strings.HasSuffix(resolvedName, ".json") {
format = "json"
}
if directory != "" {
resolvedName = directory + "/" + resolvedName
}
resolvedName = strings.ReplaceAll(resolvedName, "__", "/")
newData := parseData(value, isPlainText)
if existingData, ok := reg[resolvedName]; ok {
reg[resolvedName] = mergeData(existingData, newData)
} else {
// Store the data with the format information
reg[resolvedName] = map[string]interface{}{
"data": newData,
"format": format,
}
}
//klog.Infof("Added data for %s with format %s", resolvedName, format)
}
}
return reg, nil
}
func parseData(value string, isPlainText bool) wkData {
if isPlainText {
return value
}
var d map[string]interface{}
err := json.Unmarshal([]byte(value), &d)
if err != nil {
klog.Error(err)
return nil
}
return d
}