271 lines
6.2 KiB
Go
271 lines
6.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/bep/debounce"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
klog "k8s.io/klog/v2"
|
|
)
|
|
|
|
type WellKnownService struct {
|
|
clientset *kubernetes.Clientset
|
|
namespace string
|
|
cmName string
|
|
|
|
localCache wkRegistry
|
|
}
|
|
|
|
func NewWellKnownService(clientset *kubernetes.Clientset, namespace string, cmName string) *WellKnownService {
|
|
return &WellKnownService{
|
|
clientset: clientset,
|
|
namespace: namespace,
|
|
cmName: cmName,
|
|
}
|
|
}
|
|
|
|
func (s *WellKnownService) GetData(ctx context.Context) (wkRegistry, error) {
|
|
if s.localCache != nil {
|
|
return s.localCache, nil
|
|
}
|
|
|
|
cm, err := s.clientset.CoreV1().ConfigMaps(s.namespace).Get(ctx, s.cmName, metav1.GetOptions{})
|
|
if errors.IsNotFound(err) {
|
|
return nil, nil
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reg := make(wkRegistry)
|
|
for name, data := range cm.Data {
|
|
baseName := strings.TrimSuffix(name, ".json")
|
|
id := strings.ReplaceAll(baseName, "__", "/")
|
|
var d wkData
|
|
if err := json.Unmarshal([]byte(data), &d); err != nil {
|
|
klog.Error(err)
|
|
}
|
|
reg[id] = d
|
|
}
|
|
|
|
// Update the local cache
|
|
s.localCache = reg
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
func (s *WellKnownService) UpdateConfigMap(ctx context.Context, reg wkRegistry) error {
|
|
// Update the local cache
|
|
s.localCache = reg
|
|
|
|
cm := &v1.ConfigMap{Data: reg.encode()}
|
|
cm.Namespace = s.namespace
|
|
cm.Name = s.cmName
|
|
|
|
_, err := s.clientset.
|
|
CoreV1().
|
|
ConfigMaps(s.namespace).
|
|
Update(ctx, cm, metav1.UpdateOptions{})
|
|
if errors.IsNotFound(err) {
|
|
_, err = s.clientset.
|
|
CoreV1().
|
|
ConfigMaps(s.namespace).
|
|
Create(ctx, cm, metav1.CreateOptions{})
|
|
if err == nil {
|
|
klog.Infof("Created ConfigMap %s/%s\n", cm.GetNamespace(), cm.GetName())
|
|
}
|
|
return err
|
|
} else if err != nil {
|
|
klog.Error(err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// stableMarshal ensures the keys in maps are always sorted before encoding to JSON.
|
|
func stableMarshal(v interface{}) ([]byte, error) {
|
|
buffer := &bytes.Buffer{}
|
|
encoder := json.NewEncoder(buffer)
|
|
encoder.SetEscapeHTML(false)
|
|
encoder.SetIndent("", " ")
|
|
|
|
// Use a custom map sorter for stability
|
|
if err := encodeSorted(encoder, v); err != nil {
|
|
return nil, err
|
|
}
|
|
return buffer.Bytes(), nil
|
|
}
|
|
|
|
// encodeSorted is a helper function to sort keys before encoding map.
|
|
func encodeSorted(encoder *json.Encoder, v interface{}) error {
|
|
switch v := v.(type) {
|
|
case map[string]interface{}:
|
|
// Sort the keys in the map
|
|
keys := make([]string, 0, len(v))
|
|
for key := range v {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
// Create a new map in sorted order
|
|
sortedMap := make(map[string]interface{})
|
|
for _, key := range keys {
|
|
sortedMap[key] = v[key]
|
|
}
|
|
|
|
return encoder.Encode(sortedMap)
|
|
case map[string]string:
|
|
// Sort the keys in the map
|
|
keys := make([]string, 0, len(v))
|
|
for key := range v {
|
|
keys = append(keys, key)
|
|
}
|
|
sort.Strings(keys)
|
|
|
|
// Create a new map in sorted order
|
|
sortedMap := make(map[string]string)
|
|
for _, key := range keys {
|
|
sortedMap[key] = v[key]
|
|
}
|
|
|
|
return encoder.Encode(sortedMap)
|
|
default:
|
|
// Use default encoding for non-map types
|
|
return encoder.Encode(v)
|
|
}
|
|
}
|
|
|
|
func (s *WellKnownService) DiscoveryLoop(ctx context.Context) {
|
|
// Watch for service changes in the namespace
|
|
watch, err := s.clientset.
|
|
CoreV1().
|
|
Services(s.namespace).
|
|
Watch(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
klog.Error(err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
klog.Infof("Watching for service changes in namespace: %s", s.namespace)
|
|
|
|
debounced := debounce.New(15000 * time.Millisecond)
|
|
hash := []byte{}
|
|
|
|
for event := range watch.ResultChan() {
|
|
svc, ok := event.Object.(*v1.Service)
|
|
if !ok {
|
|
klog.Warning("Unexpected object type")
|
|
continue
|
|
}
|
|
|
|
klog.Infof("Change detected on service: %s", svc.GetName())
|
|
|
|
debounced(func() {
|
|
klog.Info("Processing service change...")
|
|
reg, err := s.collectData(ctx)
|
|
if err != nil {
|
|
klog.Error(err)
|
|
return
|
|
}
|
|
|
|
// Generate a stable JSON representation of the registry
|
|
newHash, err := stableMarshal(reg)
|
|
if err != nil {
|
|
klog.Error(err)
|
|
return
|
|
}
|
|
|
|
// Compare with the previous hash
|
|
if bytes.Equal(hash, newHash) {
|
|
klog.V(1).Info("No changes detected, skipping update")
|
|
return
|
|
}
|
|
hash = newHash
|
|
|
|
klog.Infof("Writing configmap for discovered services...")
|
|
if err := s.UpdateConfigMap(ctx, reg); err != nil {
|
|
klog.Error(err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *WellKnownService) collectData(ctx context.Context) (wkRegistry, error) {
|
|
reg := make(wkRegistry)
|
|
|
|
svcs, err := s.clientset.CoreV1().Services(s.namespace).List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return reg, err
|
|
}
|
|
|
|
for _, svc := range svcs.Items {
|
|
//klog.Infof("Processing service: %s", svc.GetName())
|
|
|
|
// Check if the service has a format lock (e.g., JSON only)
|
|
format := "default"
|
|
if fmtAnnotation, ok := svc.Annotations["well-known.252.no/format"]; ok {
|
|
format = fmtAnnotation
|
|
}
|
|
|
|
directory := ""
|
|
if dirAnnotation, ok := svc.Annotations["well-known.252.no/directory"]; ok {
|
|
directory = dirAnnotation
|
|
}
|
|
|
|
for key, value := range svc.Annotations {
|
|
resolvedName := resolveName(key)
|
|
if resolvedName == "" || resolvedName == "directory" || resolvedName == "format" {
|
|
continue
|
|
}
|
|
|
|
isPlainText := strings.HasSuffix(resolvedName, ".txt")
|
|
if isPlainText {
|
|
format = "text"
|
|
} else if strings.HasSuffix(resolvedName, ".json") {
|
|
format = "json"
|
|
}
|
|
|
|
if directory != "" {
|
|
resolvedName = directory + "/" + resolvedName
|
|
}
|
|
|
|
resolvedName = strings.ReplaceAll(resolvedName, "__", "/")
|
|
newData := parseData(value, isPlainText)
|
|
|
|
if existingData, ok := reg[resolvedName]; ok {
|
|
reg[resolvedName] = mergeData(existingData, newData)
|
|
} else {
|
|
// Store the data with the format information
|
|
reg[resolvedName] = map[string]interface{}{
|
|
"data": newData,
|
|
"format": format,
|
|
}
|
|
}
|
|
|
|
//klog.Infof("Added data for %s with format %s", resolvedName, format)
|
|
}
|
|
}
|
|
|
|
return reg, nil
|
|
}
|
|
|
|
func parseData(value string, isPlainText bool) wkData {
|
|
if isPlainText {
|
|
return value
|
|
}
|
|
var d map[string]interface{}
|
|
err := json.Unmarshal([]byte(value), &d)
|
|
if err != nil {
|
|
klog.Error(err)
|
|
return nil
|
|
}
|
|
return d
|
|
}
|