mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Propagates sidecar's port to a service (#1078)
This commit is contained in:
parent
4b3f0b2481
commit
174c21db55
4 changed files with 74 additions and 34 deletions
|
@ -5,6 +5,8 @@
|
|||
- (Feature) Early connections support
|
||||
- (Bugfix) Fix and document action timeouts
|
||||
|
||||
- (Feature) Propagate sidecars' ports to a member's service
|
||||
|
||||
## [1.2.16](https://github.com/arangodb/kube-arangodb/tree/1.2.16) (2022-09-14)
|
||||
- (Feature) Add ArangoDeployment ServerGroupStatus
|
||||
- (Feature) (EE) Ordered Member IDs
|
||||
|
|
|
@ -24,11 +24,11 @@ import (
|
|||
"context"
|
||||
"sync"
|
||||
|
||||
core "k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/features"
|
||||
"github.com/arangodb/kube-arangodb/pkg/deployment/patch"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/arangod"
|
||||
|
@ -118,9 +118,11 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
|
|||
leaderAgentSvcName := k8sutil.CreateAgentLeaderServiceName(r.context.GetAPIObject().GetName())
|
||||
deploymentName := r.context.GetAPIObject().GetName()
|
||||
|
||||
ports := []core.ServicePort{CreateServerServicePort(group)}
|
||||
|
||||
selector := k8sutil.LabelsForLeaderMember(deploymentName, group.AsRole(), leaderID)
|
||||
if s, ok := cachedStatus.Service().V1().GetSimple(leaderAgentSvcName); ok {
|
||||
if err, adjusted := r.adjustService(ctx, s, shared.ArangoPort, selector); err == nil {
|
||||
if err, adjusted := r.adjustService(ctx, s, ports, selector); err == nil {
|
||||
if !adjusted {
|
||||
// The service is not changed, so single server leader can be set.
|
||||
return r.ensureSingleServerLeader(ctx, cachedStatus)
|
||||
|
@ -132,7 +134,7 @@ func (r *Resources) EnsureLeader(ctx context.Context, cachedStatus inspectorInte
|
|||
}
|
||||
}
|
||||
|
||||
s := r.createService(leaderAgentSvcName, r.context.GetNamespace(), r.context.GetAPIObject().AsOwner(), shared.ArangoPort, selector)
|
||||
s := r.createService(leaderAgentSvcName, r.context.GetNamespace(), r.context.GetAPIObject().AsOwner(), ports, selector)
|
||||
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
_, err := cachedStatus.ServicesModInterface().V1().Create(ctxChild, s, meta.CreateOptions{})
|
||||
return err
|
||||
|
|
|
@ -40,6 +40,7 @@ import (
|
|||
"github.com/arangodb/kube-arangodb/pkg/util/globals"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil"
|
||||
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
|
||||
v1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/pod/v1"
|
||||
servicev1 "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service/v1"
|
||||
)
|
||||
|
||||
|
@ -49,7 +50,7 @@ var (
|
|||
)
|
||||
|
||||
// createService returns service's object.
|
||||
func (r *Resources) createService(name, namespace string, owner meta.OwnerReference, targetPort int32,
|
||||
func (r *Resources) createService(name, namespace string, owner meta.OwnerReference, ports []core.ServicePort,
|
||||
selector map[string]string) *core.Service {
|
||||
|
||||
return &core.Service{
|
||||
|
@ -61,15 +62,8 @@ func (r *Resources) createService(name, namespace string, owner meta.OwnerRefere
|
|||
},
|
||||
},
|
||||
Spec: core.ServiceSpec{
|
||||
Type: core.ServiceTypeClusterIP,
|
||||
Ports: []core.ServicePort{
|
||||
{
|
||||
Name: "server",
|
||||
Protocol: "TCP",
|
||||
Port: shared.ArangoPort,
|
||||
TargetPort: intstr.IntOrString{IntVal: targetPort},
|
||||
},
|
||||
},
|
||||
Type: core.ServiceTypeClusterIP,
|
||||
Ports: ports,
|
||||
PublishNotReadyAddresses: true,
|
||||
Selector: selector,
|
||||
},
|
||||
|
@ -78,19 +72,13 @@ func (r *Resources) createService(name, namespace string, owner meta.OwnerRefere
|
|||
|
||||
// adjustService checks whether service contains is valid and if not than it reconciles service.
|
||||
// Returns true if service is adjusted.
|
||||
func (r *Resources) adjustService(ctx context.Context, s *core.Service, targetPort int32, selector map[string]string) (error, bool) {
|
||||
func (r *Resources) adjustService(ctx context.Context, s *core.Service, ports []core.ServicePort,
|
||||
selector map[string]string) (error, bool) {
|
||||
services := r.context.ACS().CurrentClusterCache().ServicesModInterface().V1()
|
||||
spec := s.Spec.DeepCopy()
|
||||
|
||||
spec.Type = core.ServiceTypeClusterIP
|
||||
spec.Ports = []core.ServicePort{
|
||||
{
|
||||
Name: "server",
|
||||
Protocol: "TCP",
|
||||
Port: shared.ArangoPort,
|
||||
TargetPort: intstr.IntOrString{IntVal: targetPort},
|
||||
},
|
||||
}
|
||||
spec.Ports = ports
|
||||
spec.PublishNotReadyAddresses = true
|
||||
spec.Selector = selector
|
||||
if equality.Semantic.DeepDerivative(*spec, s.Spec) {
|
||||
|
@ -127,20 +115,12 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
|
||||
// Fetch existing services
|
||||
svcs := cachedStatus.ServicesModInterface().V1()
|
||||
podInspector := cachedStatus.Pod().V1()
|
||||
|
||||
reconcileRequired := k8sutil.NewReconcile(cachedStatus)
|
||||
|
||||
// Ensure member services
|
||||
for _, e := range status.Members.AsList() {
|
||||
var targetPort int32 = shared.ArangoPort
|
||||
|
||||
switch e.Group {
|
||||
case api.ServerGroupSyncMasters:
|
||||
targetPort = shared.ArangoSyncMasterPort
|
||||
case api.ServerGroupSyncWorkers:
|
||||
targetPort = shared.ArangoSyncWorkerPort
|
||||
}
|
||||
|
||||
memberName := e.Member.ArangoMemberName(r.context.GetAPIObject().GetName(), e.Group)
|
||||
|
||||
member, ok := cachedStatus.ArangoMember().V1().GetSimple(memberName)
|
||||
|
@ -148,9 +128,10 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
return errors.Newf("Member %s not found", memberName)
|
||||
}
|
||||
|
||||
ports := CreateServerServicePortsWithSidecars(podInspector, e.Member.PodName, e.Group)
|
||||
selector := k8sutil.LabelsForMember(deploymentName, e.Group.AsRole(), e.Member.ID)
|
||||
if s, ok := cachedStatus.Service().V1().GetSimple(member.GetName()); !ok {
|
||||
s := r.createService(member.GetName(), member.GetNamespace(), member.AsOwner(), targetPort, selector)
|
||||
s := r.createService(member.GetName(), member.GetNamespace(), member.AsOwner(), ports, selector)
|
||||
|
||||
err := globals.GetGlobalTimeouts().Kubernetes().RunWithTimeout(ctx, func(ctxChild context.Context) error {
|
||||
_, err := svcs.Create(ctxChild, s, meta.CreateOptions{})
|
||||
|
@ -165,7 +146,7 @@ func (r *Resources) EnsureServices(ctx context.Context, cachedStatus inspectorIn
|
|||
reconcileRequired.Required()
|
||||
continue
|
||||
} else {
|
||||
if err, adjusted := r.adjustService(ctx, s, targetPort, selector); err == nil {
|
||||
if err, adjusted := r.adjustService(ctx, s, ports, selector); err == nil {
|
||||
if adjusted {
|
||||
reconcileRequired.Required()
|
||||
}
|
||||
|
@ -456,3 +437,57 @@ func ensureManagedServiceSelector(ctx context.Context, selector map[string]strin
|
|||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// CreateServerServicePortsWithSidecars returns ports for the service.
|
||||
func CreateServerServicePortsWithSidecars(podInspector v1.Inspector, podName string, group api.ServerGroup) []core.ServicePort {
|
||||
// Create service port for the `server` container.
|
||||
ports := []core.ServicePort{CreateServerServicePort(group)}
|
||||
|
||||
if podInspector == nil {
|
||||
return ports
|
||||
}
|
||||
|
||||
if p, ok := podInspector.GetSimple(podName); ok {
|
||||
for _, c := range p.Spec.Containers {
|
||||
if c.Name == api.ServerGroupReservedContainerNameServer {
|
||||
// It is already added.
|
||||
continue
|
||||
}
|
||||
for _, port := range c.Ports {
|
||||
ports = append(ports, core.ServicePort{
|
||||
Name: port.Name,
|
||||
Protocol: core.ProtocolTCP,
|
||||
Port: port.ContainerPort,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return ports
|
||||
}
|
||||
|
||||
// CreateServerServicePort creates main server service port.
|
||||
func CreateServerServicePort(group api.ServerGroup) core.ServicePort {
|
||||
serverTargetPort := getTargetPort(group)
|
||||
return core.ServicePort{
|
||||
Name: api.ServerGroupReservedContainerNameServer,
|
||||
Protocol: core.ProtocolTCP,
|
||||
Port: shared.ArangoPort,
|
||||
TargetPort: intstr.IntOrString{
|
||||
IntVal: serverTargetPort,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// getTargetPort returns target port for the given server group.
|
||||
func getTargetPort(group api.ServerGroup) int32 {
|
||||
if group == api.ServerGroupSyncMasters {
|
||||
return shared.ArangoSyncMasterPort
|
||||
}
|
||||
|
||||
if group == api.ServerGroupSyncWorkers {
|
||||
return shared.ArangoSyncWorkerPort
|
||||
}
|
||||
|
||||
return shared.ArangoPort
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
core "k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
||||
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
|
||||
"github.com/arangodb/kube-arangodb/pkg/apis/shared"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/errors"
|
||||
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/service"
|
||||
|
@ -145,7 +146,7 @@ func CreateDatabaseClientService(ctx context.Context, svcs servicev1.ModInterfac
|
|||
svcName := CreateDatabaseClientServiceName(deploymentName)
|
||||
ports := []core.ServicePort{
|
||||
{
|
||||
Name: "server",
|
||||
Name: api.ServerGroupReservedContainerNameServer,
|
||||
Protocol: core.ProtocolTCP,
|
||||
Port: shared.ArangoPort,
|
||||
},
|
||||
|
|
Loading…
Reference in a new issue