1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-15 17:51:03 +00:00

Fixed PV cleanup

This commit is contained in:
Ewout Prangsma 2018-03-06 15:37:25 +01:00
parent 4236d8d564
commit 9e429a8498
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
9 changed files with 196 additions and 32 deletions

77
pkg/storage/clients.go Normal file
View file

@ -0,0 +1,77 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"context"
"fmt"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner/client"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
// createProvisionerClients creates a list of clients for all known
// provisioners.
func (ls *LocalStorage) createProvisionerClients() ([]provisioner.API, error) {
// Find provisioner endpoints
ns := ls.apiObject.GetNamespace()
listOptions := k8sutil.LocalStorageListOpt(ls.apiObject.GetName(), roleProvisioner)
items, err := ls.deps.KubeCli.CoreV1().Endpoints(ns).List(listOptions)
if err != nil {
return nil, maskAny(err)
}
addrs := createValidEndpointList(items)
if len(addrs) == 0 {
// No provisioners available
return nil, nil
}
// Create clients for endpoints
clients := make([]provisioner.API, len(addrs))
for i, addr := range addrs {
var err error
clients[i], err = client.New(fmt.Sprintf("http://%s", addr))
if err != nil {
return nil, maskAny(err)
}
}
return clients, nil
}
// GetClientByNodeName looks for a client that serves the given node name.
// Returns an error if no such client is found.
func (ls *LocalStorage) GetClientByNodeName(nodeName string) (provisioner.API, error) {
clients, err := ls.createProvisionerClients()
if err != nil {
return nil, maskAny(err)
}
// Find matching client
for _, c := range clients {
ctx := context.Background()
if info, err := c.GetNodeInfo(ctx); err == nil && info.NodeName == nodeName {
return c, nil
}
}
return nil, maskAny(fmt.Errorf("No client found for node name '%s'", nodeName))
}

View file

@ -110,9 +110,10 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoLocalStorage) (*
eventCh: make(chan *localStorageEvent, localStorageEventQueueSize),
stopCh: make(chan struct{}),
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
pvCleaner: newPVCleaner(deps.Log, deps.KubeCli),
}
ls.pvCleaner = newPVCleaner(deps.Log, deps.KubeCli, ls.GetClientByNodeName)
go ls.run()
go ls.listenForPvcEvents()
go ls.listenForPvEvents()

View file

@ -30,6 +30,8 @@ const (
// API of the provisioner
type API interface {
// GetNodeInfo fetches information from the current node.
GetNodeInfo(ctx context.Context) (NodeInfo, error)
// GetInfo fetches information from the filesystem containing
// the given local path on the current node.
GetInfo(ctx context.Context, localPath string) (Info, error)
@ -39,11 +41,16 @@ type API interface {
Remove(ctx context.Context, localPath string) error
}
// NodeInfo holds information of a node.
type NodeInfo struct {
NodeName string `json:"nodeName"`
}
// Info holds information of a filesystem on a node.
type Info struct {
NodeName string `json:"nodeName"`
Available int64 `json:"available"`
Capacity int64 `json:"capacity"`
NodeInfo
Available int64 `json:"available"`
Capacity int64 `json:"capacity"`
}
// Request body for API HTTP requests.

View file

@ -81,6 +81,19 @@ var (
}
)
// GetNodeInfo fetches information from the current node.
func (c *client) GetNodeInfo(ctx context.Context) (provisioner.NodeInfo, error) {
req, err := c.newRequest("GET", "/nodeinfo", nil)
if err != nil {
return provisioner.NodeInfo{}, maskAny(err)
}
var result provisioner.NodeInfo
if err := c.do(ctx, req, &result); err != nil {
return provisioner.NodeInfo{}, maskAny(err)
}
return result, nil
}
// GetInfo fetches information from the filesystem containing
// the given local path.
func (c *client) GetInfo(ctx context.Context, localPath string) (provisioner.Info, error) {

View file

@ -62,6 +62,13 @@ func (p *Provisioner) Run(ctx context.Context) {
runServer(ctx, p.Log, p.Address, p)
}
// GetNodeInfo fetches information from the current node.
func (p *Provisioner) GetNodeInfo(ctx context.Context) (provisioner.NodeInfo, error) {
return provisioner.NodeInfo{
NodeName: p.NodeName,
}, nil
}
// GetInfo fetches information from the filesystem containing
// the given local path.
func (p *Provisioner) GetInfo(ctx context.Context, localPath string) (provisioner.Info, error) {
@ -86,7 +93,9 @@ func (p *Provisioner) GetInfo(ctx context.Context, localPath string) (provisione
Int64("available", available).
Msg("Returning info for local path")
return provisioner.Info{
NodeName: p.NodeName,
NodeInfo: provisioner.NodeInfo{
NodeName: p.NodeName,
},
Available: available,
Capacity: capacity,
}, nil

View file

@ -42,6 +42,7 @@ const (
// runServer runs a HTTP server serving the given API
func runServer(ctx context.Context, log zerolog.Logger, addr string, api provisioner.API) error {
mux := httprouter.New()
mux.GET("/nodeinfo", getNodeInfoHandler(api))
mux.POST("/info", getInfoHandler(api))
mux.POST("/prepare", getPrepareHandler(api))
mux.POST("/remove", getRemoveHandler(api))
@ -71,6 +72,18 @@ func runServer(ctx context.Context, log zerolog.Logger, addr string, api provisi
}
}
func getNodeInfoHandler(api provisioner.API) func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
ctx := r.Context()
result, err := api.GetNodeInfo(ctx)
if err != nil {
handleError(w, err)
} else {
sendJSON(w, http.StatusOK, result)
}
}
}
func getInfoHandler(api provisioner.API) func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
ctx := r.Context()

View file

@ -23,29 +23,35 @@
package storage
import (
"context"
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/util/trigger"
"github.com/rs/zerolog"
)
type pvCleaner struct {
mutex sync.Mutex
log zerolog.Logger
cli kubernetes.Interface
items []v1.PersistentVolume
trigger trigger.Trigger
mutex sync.Mutex
log zerolog.Logger
cli kubernetes.Interface
items []v1.PersistentVolume
trigger trigger.Trigger
clientGetter func(nodeName string) (provisioner.API, error)
}
// newPVCleaner creates a new cleaner of persistent volumes.
func newPVCleaner(log zerolog.Logger, cli kubernetes.Interface) *pvCleaner {
func newPVCleaner(log zerolog.Logger, cli kubernetes.Interface, clientGetter func(nodeName string) (provisioner.API, error)) *pvCleaner {
return &pvCleaner{
log: log,
cli: cli,
log: log,
cli: cli,
clientGetter: clientGetter,
}
}
@ -121,5 +127,44 @@ func (c *pvCleaner) cleanFirst() (bool, error) {
// clean tries to clean the given PV.
func (c *pvCleaner) clean(pv v1.PersistentVolume) error {
log := c.log.With().Str("name", pv.GetName()).Logger()
log.Debug().Msg("Cleaning PersistentVolume")
// Find local path
localSource := pv.Spec.PersistentVolumeSource.Local
if localSource == nil {
return maskAny(fmt.Errorf("PersistentVolume has no local source"))
}
localPath := localSource.Path
// Find client that serves the node
nodeName := pv.GetAnnotations()[nodeNameAnnotation]
if nodeName == "" {
return maskAny(fmt.Errorf("PersistentVolume has no node-name annotation"))
}
client, err := c.clientGetter(nodeName)
if err != nil {
log.Debug().Err(err).Str("node", nodeName).Msg("Failed to get client for node")
return maskAny(err)
}
// Clean volume through client
ctx := context.Background()
if err := client.Remove(ctx, localPath); err != nil {
log.Debug().Err(err).
Str("node", nodeName).
Str("local-path", localPath).
Msg("Failed to remove local path")
return maskAny(err)
}
// Remove persistent volume
if err := c.cli.CoreV1().PersistentVolumes().Delete(pv.GetName(), &metav1.DeleteOptions{}); err != nil {
log.Debug().Err(err).
Str("name", pv.GetName()).
Msg("Failed to remove PersistentVolume")
return maskAny(err)
}
return nil
}

View file

@ -41,38 +41,32 @@ import (
api "github.com/arangodb/k8s-operator/pkg/apis/storage/v1alpha"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner/client"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
const (
defaultVolumeSize = int64(8 * 1024 * 1024 * 1024) // 8GB
// AnnProvisionedBy is the external provisioner annotation in PV object
AnnProvisionedBy = "pv.kubernetes.io/provisioned-by"
)
var (
// name of the annotation containing the node name
nodeNameAnnotation = api.SchemeGroupVersion.Group + "/node-name"
)
// createPVs creates a given number of PersistentVolume's.
func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLocalStorage, unboundClaims []v1.PersistentVolumeClaim) error {
log := ls.deps.Log
// Find provisioner endpoints
ns := apiObject.GetNamespace()
listOptions := k8sutil.LocalStorageListOpt(apiObject.GetName(), roleProvisioner)
items, err := ls.deps.KubeCli.CoreV1().Endpoints(ns).List(listOptions)
// Find provisioner clients
clients, err := ls.createProvisionerClients()
if err != nil {
return maskAny(err)
}
addrs := createValidEndpointList(items)
if len(addrs) == 0 {
if len(clients) == 0 {
// No provisioners available
return maskAny(fmt.Errorf("No ready provisioner endpoints found"))
}
// Create clients for endpoints
clients := make([]provisioner.API, len(addrs))
for i, addr := range addrs {
var err error
clients[i], err = client.New(fmt.Sprintf("http://%s", addr))
if err != nil {
return maskAny(err)
}
}
// Randomize list
rand.Shuffle(len(clients), func(i, j int) {
clients[i], clients[j] = clients[j], clients[i]
@ -133,8 +127,9 @@ func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocal
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
Annotations: map[string]string{
// AnnProvisionedBy: config.ProvisionerName,
AnnProvisionedBy: storageClassProvisioner,
v1.AlphaStorageNodeAffinityAnnotation: nodeAff,
nodeNameAnnotation: info.NodeName,
},
},
Spec: v1.PersistentVolumeSpec{

View file

@ -31,6 +31,7 @@ import (
// released volumes.
// Returns the number of available PV's.
func (ls *LocalStorage) inspectPVs() (int, error) {
log := ls.deps.Log
list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
if err != nil {
return 0, maskAny(err)
@ -48,7 +49,10 @@ func (ls *LocalStorage) inspectPVs() (int, error) {
case v1.VolumeReleased:
if ls.isOwnerOf(&pv) {
// Cleanup this volume
log.Debug().Str("name", pv.GetName()).Msg("Added PersistentVolume to cleaner")
ls.pvCleaner.Add(pv)
} else {
log.Debug().Str("name", pv.GetName()).Msg("PersistentVolume is not owned by us")
}
}
}