163 lines
3.3 KiB
Go
163 lines
3.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"os"
|
|
"time"
|
|
|
|
"github.com/bep/debounce"
|
|
"github.com/davegardnerisme/deephash"
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/client-go/kubernetes"
|
|
klog "k8s.io/klog/v2"
|
|
)
|
|
|
|
var regLocal *wkRegistry
|
|
|
|
type WellKnownService struct {
|
|
clientset *kubernetes.Clientset
|
|
namespace string
|
|
cmName string
|
|
|
|
localCache *wkRegistry
|
|
}
|
|
|
|
func NewWellKnownService(clientset *kubernetes.Clientset, namespace string, cmName string) *WellKnownService {
|
|
return &WellKnownService{
|
|
clientset: clientset,
|
|
namespace: namespace,
|
|
cmName: cmName,
|
|
}
|
|
}
|
|
|
|
func (s *WellKnownService) GetData(ctx context.Context) (*wkRegistry, error) {
|
|
if s.localCache != nil {
|
|
return s.localCache, nil
|
|
}
|
|
|
|
cm, err := s.clientset.CoreV1().ConfigMaps(s.namespace).Get(ctx, s.cmName, metav1.GetOptions{})
|
|
if errors.IsNotFound(err) {
|
|
return nil, nil
|
|
} else if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reg := make(wkRegistry, 0)
|
|
for name, data := range cm.Data {
|
|
var d wkData
|
|
if err := json.Unmarshal([]byte(data), &d); err != nil {
|
|
klog.Error(err)
|
|
}
|
|
reg[name] = d
|
|
}
|
|
|
|
return ®, nil
|
|
}
|
|
|
|
func (s *WellKnownService) UpdateConfigMap(ctx context.Context, reg wkRegistry) error {
|
|
s.localCache = ®
|
|
|
|
cm := &v1.ConfigMap{Data: reg.encode()}
|
|
cm.Namespace = s.namespace
|
|
cm.Name = s.cmName
|
|
|
|
_, err := s.clientset.
|
|
CoreV1().
|
|
ConfigMaps(s.namespace).
|
|
Update(ctx, cm, metav1.UpdateOptions{})
|
|
if errors.IsNotFound(err) {
|
|
_, err = s.clientset.
|
|
CoreV1().
|
|
ConfigMaps(s.namespace).
|
|
Create(ctx, cm, metav1.CreateOptions{})
|
|
if err == nil {
|
|
klog.Infof("Created ConfigMap %s/%s\n", cm.GetNamespace(), cm.GetName())
|
|
}
|
|
return err
|
|
} else if err != nil {
|
|
klog.Error(err)
|
|
return err
|
|
}
|
|
|
|
klog.Infof("Updated ConfigMap %s/%s\n", cm.GetNamespace(), cm.GetName())
|
|
return nil
|
|
}
|
|
|
|
func (s *WellKnownService) DiscoveryLoop(ctx context.Context) {
|
|
watch, err := s.clientset.
|
|
CoreV1().
|
|
Services(s.namespace).
|
|
Watch(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
klog.Error(err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
debounced := debounce.New(500 * time.Millisecond)
|
|
hash := []byte{}
|
|
|
|
for event := range watch.ResultChan() {
|
|
svc, ok := event.Object.(*v1.Service)
|
|
if !ok {
|
|
continue
|
|
}
|
|
klog.V(1).Infof("Change detected on %s", svc.GetName())
|
|
|
|
debounced(func() {
|
|
reg, err := s.collectData(ctx)
|
|
if err != nil {
|
|
klog.Error(err)
|
|
return
|
|
}
|
|
|
|
newHash := deephash.Hash(reg)
|
|
if string(hash) == string(newHash) {
|
|
klog.V(1).Info("No changes detected")
|
|
return
|
|
}
|
|
hash = newHash
|
|
|
|
klog.Info("Writing configmap")
|
|
if err := s.UpdateConfigMap(ctx, reg); err != nil {
|
|
klog.Error(err)
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func (s *WellKnownService) collectData(ctx context.Context) (wkRegistry, error) {
|
|
reg := make(wkRegistry, 0)
|
|
|
|
svcs, err := s.clientset.
|
|
CoreV1().
|
|
Services(s.namespace).
|
|
List(ctx, metav1.ListOptions{})
|
|
if err != nil {
|
|
return reg, err
|
|
}
|
|
|
|
for _, svc := range svcs.Items {
|
|
for name, value := range svc.ObjectMeta.Annotations {
|
|
name = resolveName(name)
|
|
if name == "" {
|
|
continue
|
|
}
|
|
|
|
if _, ok := reg[name]; !ok {
|
|
reg[name] = make(wkData, 0)
|
|
}
|
|
|
|
var d map[string]interface{}
|
|
err := json.Unmarshal([]byte(value), &d)
|
|
if err != nil {
|
|
klog.Error(err)
|
|
}
|
|
|
|
reg[name].append(d)
|
|
}
|
|
}
|
|
return reg, nil
|
|
}
|