mirror of
https://github.com/arangodb/kube-arangodb.git
synced 2024-12-14 11:57:37 +00:00
Scaling up/down
This commit is contained in:
parent
3af4eb695f
commit
242220589e
8 changed files with 108 additions and 16 deletions
|
@ -5,4 +5,6 @@ metadata:
|
|||
spec:
|
||||
mode: cluster
|
||||
dbservers:
|
||||
count: 5
|
||||
count: 3
|
||||
coordinators:
|
||||
count: 2
|
|
@ -97,6 +97,30 @@ func (ds DeploymentStatusMembers) ContainsID(id string) bool {
|
|||
ds.SyncWorkers.ContainsID(id)
|
||||
}
|
||||
|
||||
// ElementByID returns the element in the given list that has the given ID and true.
|
||||
// If no such element exists, false is returned.
|
||||
func (ds DeploymentStatusMembers) ElementByID(id string) (MemberStatus, ServerGroup, bool) {
|
||||
if result, found := ds.Single.ElementByID(id); found {
|
||||
return result, ServerGroupSingle, true
|
||||
}
|
||||
if result, found := ds.Agents.ElementByID(id); found {
|
||||
return result, ServerGroupAgents, true
|
||||
}
|
||||
if result, found := ds.DBServers.ElementByID(id); found {
|
||||
return result, ServerGroupDBServers, true
|
||||
}
|
||||
if result, found := ds.Coordinators.ElementByID(id); found {
|
||||
return result, ServerGroupCoordinators, true
|
||||
}
|
||||
if result, found := ds.SyncMasters.ElementByID(id); found {
|
||||
return result, ServerGroupSyncMasters, true
|
||||
}
|
||||
if result, found := ds.SyncWorkers.ElementByID(id); found {
|
||||
return result, ServerGroupSyncWorkers, true
|
||||
}
|
||||
return MemberStatus{}, 0, false
|
||||
}
|
||||
|
||||
// ForeachServerGroup calls the given callback for all server groups.
|
||||
// If the callback returns an error, this error is returned and the callback is
|
||||
// not called for the remaining groups.
|
||||
|
|
|
@ -34,10 +34,11 @@ import (
|
|||
)
|
||||
|
||||
type clientCache struct {
|
||||
mutex sync.Mutex
|
||||
clients map[string]driver.Client
|
||||
kubecli kubernetes.Interface
|
||||
apiObject *api.ArangoDeployment
|
||||
mutex sync.Mutex
|
||||
clients map[string]driver.Client
|
||||
kubecli kubernetes.Interface
|
||||
apiObject *api.ArangoDeployment
|
||||
databaseClient driver.Client
|
||||
}
|
||||
|
||||
// newClientCache creates a new client cache
|
||||
|
@ -69,3 +70,22 @@ func (cc *clientCache) Get(group api.ServerGroup, id string) (driver.Client, err
|
|||
cc.clients[key] = c
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// GetDatabase returns a cached client for the entire database (cluster coordinators or single server),
|
||||
// creating one if needed.
|
||||
func (cc *clientCache) GetDatabase() (driver.Client, error) {
|
||||
cc.mutex.Lock()
|
||||
defer cc.mutex.Unlock()
|
||||
|
||||
if c := cc.databaseClient; c != nil {
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Not found, create a new client
|
||||
c, err := arangod.CreateArangodDatabaseClient(cc.kubecli, cc.apiObject)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
cc.databaseClient = c
|
||||
return c, nil
|
||||
}
|
||||
|
|
|
@ -289,6 +289,9 @@ func (d *Deployment) handleArangoDeploymentUpdatedEvent(event *deploymentEvent)
|
|||
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
|
||||
}
|
||||
|
||||
// Trigger inspect
|
||||
d.inspectTrigger.Trigger()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -131,7 +131,7 @@ func (d *Deployment) startAction(ctx context.Context, action api.Action) (bool,
|
|||
log.Error().Str("group", action.Group.AsRole()).Str("id", action.MemberID).Msg("No such member")
|
||||
return true, nil
|
||||
}
|
||||
c, err := d.clientCache.Get(action.Group, action.MemberID)
|
||||
c, err := d.clientCache.GetDatabase()
|
||||
if err != nil {
|
||||
log.Debug().Err(err).Str("group", action.Group.AsRole()).Msg("Failed to create member client")
|
||||
return false, maskAny(err)
|
||||
|
@ -152,7 +152,7 @@ func (d *Deployment) startAction(ctx context.Context, action api.Action) (bool,
|
|||
}
|
||||
return true, nil
|
||||
case api.ActionTypeShutdownMember:
|
||||
m, ok := d.status.Members.DBServers.ElementByID(action.MemberID)
|
||||
m, _, ok := d.status.Members.ElementByID(action.MemberID)
|
||||
if !ok {
|
||||
log.Error().Str("group", action.Group.AsRole()).Str("id", action.MemberID).Msg("No such member")
|
||||
return true, nil
|
||||
|
@ -188,7 +188,7 @@ func (d *Deployment) checkActionProgress(ctx context.Context, action api.Action)
|
|||
// Nothing todo
|
||||
return true, nil
|
||||
case api.ActionTypeCleanOutMember:
|
||||
c, err := d.clientCache.Get(action.Group, action.MemberID)
|
||||
c, err := d.clientCache.GetDatabase()
|
||||
if err != nil {
|
||||
return false, maskAny(err)
|
||||
}
|
||||
|
@ -207,7 +207,11 @@ func (d *Deployment) checkActionProgress(ctx context.Context, action api.Action)
|
|||
// Cleanout completed
|
||||
return true, nil
|
||||
case api.ActionTypeShutdownMember:
|
||||
// TODO
|
||||
if d.status.Members.ContainsID(action.MemberID) {
|
||||
// Member still exists, retry soon
|
||||
return false, nil
|
||||
}
|
||||
// Member is gone, shutdown is done
|
||||
return true, nil
|
||||
default:
|
||||
return false, maskAny(fmt.Errorf("Unknown action type"))
|
||||
|
|
|
@ -91,13 +91,26 @@ func (d *Deployment) inspectPods() error {
|
|||
d.status.Members.ForeachServerGroup(func(group api.ServerGroup, members *api.MemberStatusList) error {
|
||||
for _, m := range *members {
|
||||
if podName := m.PodName; podName != "" {
|
||||
if !podExists(podName) && m.State != api.MemberStateNone {
|
||||
m.State = api.MemberStateNone // This is trigger a recreate of the pod.
|
||||
// Create event
|
||||
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), d.apiObject))
|
||||
if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") {
|
||||
if err := d.status.Members.UpdateMemberStatus(m, group); err != nil {
|
||||
return maskAny(err)
|
||||
if !podExists(podName) {
|
||||
switch m.State {
|
||||
case api.MemberStateNone:
|
||||
// Do nothing
|
||||
case api.MemberStateShuttingDown:
|
||||
// Remove member
|
||||
if m, found := members.ElementByPodName(podName); found {
|
||||
if err := members.RemoveByID(m.ID); err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
events = append(events, k8sutil.NewMemberRemoveEvent(podName, group.AsRole(), d.apiObject))
|
||||
}
|
||||
default:
|
||||
m.State = api.MemberStateNone // This is trigger a recreate of the pod.
|
||||
// Create event
|
||||
events = append(events, k8sutil.NewPodGoneEvent(podName, group.AsRole(), d.apiObject))
|
||||
if m.Conditions.Update(api.ConditionTypeReady, false, "Pod Does Not Exist", "") {
|
||||
if err := d.status.Members.UpdateMemberStatus(m, group); err != nil {
|
||||
return maskAny(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -55,6 +55,26 @@ var (
|
|||
func CreateArangodClient(kubecli kubernetes.Interface, apiObject *api.ArangoDeployment, group api.ServerGroup, id string) (driver.Client, error) {
|
||||
// Create connection
|
||||
dnsName := k8sutil.CreatePodDNSName(apiObject, group.AsRole(), id)
|
||||
c, err := createArangodClientForDNSName(kubecli, apiObject, dnsName)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// CreateArangodDatabaseClient creates a go-driver client for accessing the entire cluster (or single server).
|
||||
func CreateArangodDatabaseClient(kubecli kubernetes.Interface, apiObject *api.ArangoDeployment) (driver.Client, error) {
|
||||
// Create connection
|
||||
dnsName := k8sutil.CreateDatabaseClientServiceDNSName(apiObject)
|
||||
c, err := createArangodClientForDNSName(kubecli, apiObject, dnsName)
|
||||
if err != nil {
|
||||
return nil, maskAny(err)
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// CreateArangodClientForDNSName creates a go-driver client for a given DNS name.
|
||||
func createArangodClientForDNSName(kubecli kubernetes.Interface, apiObject *api.ArangoDeployment, dnsName string) (driver.Client, error) {
|
||||
scheme := "http"
|
||||
connConfig := http.ConnectionConfig{
|
||||
Endpoints: []string{scheme + "://" + net.JoinHostPort(dnsName, strconv.Itoa(k8sutil.ArangoPort))},
|
||||
|
|
|
@ -33,3 +33,9 @@ func CreatePodDNSName(deployment metav1.Object, role, id string) string {
|
|||
CreateHeadlessServiceName(deployment.GetName()) + "." +
|
||||
deployment.GetNamespace() + ".svc"
|
||||
}
|
||||
|
||||
// CreateDatabaseClientServiceDNSName returns the DNS of the database client service.
|
||||
func CreateDatabaseClientServiceDNSName(deployment metav1.Object) string {
|
||||
return CreateDatabaseClientServiceName(deployment.GetName()) + "." +
|
||||
deployment.GetNamespace() + ".svc"
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue