1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

Implemented PV creation, lots of fixes

This commit is contained in:
Ewout Prangsma 2018-03-06 10:54:12 +01:00
parent d76b8b4aeb
commit 6daf05e779
No known key found for this signature in database
GPG key ID: 4DBAD380D93D0698
22 changed files with 786 additions and 97 deletions

View file

@ -11,7 +11,7 @@ spec:
spec:
containers:
- name: arangodb-operator
imagePullPolicy: Always
imagePullPolicy: IfNotPresent
image: arangodb/arangodb-operator:latest
env:
- name: MY_POD_NAMESPACE

View file

@ -4,3 +4,5 @@ metadata:
name: "example-simple-single"
spec:
mode: single
single:
storageClassName: my-local-ssd

View file

@ -30,7 +30,7 @@ import (
const (
ArangoLocalStorageResourceKind = "ArangoLocalStorage"
ArangoLocalStorageResourcePlural = "arangolocalstores"
ArangoLocalStorageResourcePlural = "arangolocalstorages"
groupName = "storage.arangodb.com"
)

View file

@ -48,7 +48,7 @@ func (o *Operator) initCRD() error {
log := o.Dependencies.Log
log.Debug().Msg("Creating ArangoDeployment CRD")
if err := crd.CreateCRD(o.KubeExtCli, deplapi.ArangoDeploymentCRDName, deplapi.ArangoDeploymentResourceKind, deplapi.ArangoDeploymentResourcePlural, deplapi.ArangoDeploymentShortNames...); err != nil {
if err := crd.CreateCRD(o.KubeExtCli, deplapi.SchemeGroupVersion, deplapi.ArangoDeploymentCRDName, deplapi.ArangoDeploymentResourceKind, deplapi.ArangoDeploymentResourcePlural, deplapi.ArangoDeploymentShortNames...); err != nil {
return maskAny(errors.Wrapf(err, "failed to create CRD: %v", err))
}
log.Debug().Msg("Waiting for ArangoDeployment CRD to be ready")
@ -57,7 +57,7 @@ func (o *Operator) initCRD() error {
}
log.Debug().Msg("Creating ArangoLocalStorage CRD")
if err := crd.CreateCRD(o.KubeExtCli, lsapi.ArangoLocalStorageCRDName, lsapi.ArangoLocalStorageResourceKind, lsapi.ArangoLocalStorageResourcePlural, lsapi.ArangoLocalStorageShortNames...); err != nil {
if err := crd.CreateCRD(o.KubeExtCli, lsapi.SchemeGroupVersion, lsapi.ArangoLocalStorageCRDName, lsapi.ArangoLocalStorageResourceKind, lsapi.ArangoLocalStorageResourcePlural, lsapi.ArangoLocalStorageShortNames...); err != nil {
return maskAny(errors.Wrapf(err, "failed to create CRD: %v", err))
}
log.Debug().Msg("Waiting for ArangoLocalStorage CRD to be ready")

View file

@ -48,7 +48,7 @@ var (
// This registers a listener and waits until the process stops.
func (o *Operator) runLocalStorages() {
source := cache.NewListWatchFromClient(
o.Dependencies.CRCli.DatabaseV1alpha().RESTClient(),
o.Dependencies.CRCli.StorageV1alpha().RESTClient(),
api.ArangoLocalStorageResourcePlural,
o.Config.Namespace,
fields.Everything())

View file

@ -24,32 +24,64 @@ package storage
import (
"fmt"
"strconv"
"k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/k8s-operator/pkg/apis/storage/v1alpha"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/util/constants"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
const (
roleProvisioner = "provisioner"
)
// ensureDaemonSet ensures that a daemonset is created for the given local storage.
func (l *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error {
ns := apiObject.GetNamespace()
c := corev1.Container{
Image: l.image,
Name: "provisioner",
Image: l.image,
ImagePullPolicy: l.imagePullPolicy,
Args: []string{
"storage",
"provisioner",
"--storage-class-name=" + apiObject.Spec.StorageClass.Name,
"--port=" + strconv.Itoa(provisioner.DefaultPort),
},
Ports: []corev1.ContainerPort{
corev1.ContainerPort{
ContainerPort: int32(provisioner.DefaultPort),
},
},
Env: []corev1.EnvVar{
corev1.EnvVar{
Name: constants.EnvOperatorNodeName,
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{
FieldPath: "spec.nodeName",
},
},
},
},
}
dsLabels := k8sutil.LabelsForLocalStorage(apiObject.GetName(), roleProvisioner)
ds := &v1.DaemonSet{
ObjectMeta: metav1.ObjectMeta{
Name: apiObject.GetName(),
Name: apiObject.GetName(),
Labels: dsLabels,
},
Spec: v1.DaemonSetSpec{
Selector: &metav1.LabelSelector{
MatchLabels: dsLabels,
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: dsLabels,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
c,
@ -61,10 +93,10 @@ func (l *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error
}
for i, lp := range apiObject.Spec.LocalPath {
volName := fmt.Sprintf("local-path-%d", i)
c.Args = append(c.Args, fmt.Sprintf("--local-path=%s", lp))
c.VolumeMounts = append(c.VolumeMounts,
corev1.VolumeMount{
Name: volName,
Name: volName,
MountPath: lp,
})
hostPathType := corev1.HostPathDirectoryOrCreate
ds.Spec.Template.Spec.Volumes = append(ds.Spec.Template.Spec.Volumes, corev1.Volume{
@ -77,6 +109,9 @@ func (l *LocalStorage) ensureDaemonSet(apiObject *api.ArangoLocalStorage) error
},
})
}
// Attach DS to ArangoLocalStorage
ds.SetOwnerReferences(append(ds.GetOwnerReferences(), apiObject.AsOwner()))
// Create DS
if _, err := l.deps.KubeCli.AppsV1().DaemonSets(ns).Create(ds); !k8sutil.IsAlreadyExists(err) && err != nil {
return maskAny(err)
}

View file

@ -22,18 +22,22 @@
package storage
import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// getMyImage fetched the docker image from my own pod
func (l *LocalStorage) getMyImage() (string, error) {
func (l *LocalStorage) getMyImage() (string, v1.PullPolicy, error) {
log := l.deps.Log
ns := l.apiObject.GetNamespace()
p, err := l.deps.KubeCli.CoreV1().Pods(ns).Get(l.config.PodName, metav1.GetOptions{})
if err != nil {
log.Debug().Err(err).Str("pod-name", l.config.PodName).Msg("Failed to get my own pod")
return "", maskAny(err)
return "", "", maskAny(err)
}
return p.Spec.Containers[0].Image, nil
c := p.Spec.Containers[0]
return c.Image, c.ImagePullPolicy, nil
}

View file

@ -23,6 +23,7 @@
package storage
import (
"context"
"fmt"
"reflect"
"time"
@ -59,12 +60,15 @@ type localStorageEventType string
const (
eventArangoLocalStorageUpdated localStorageEventType = "ArangoLocalStorageUpdated"
eventPVCAdded localStorageEventType = "pvcAdded"
eventPVCUpdated localStorageEventType = "pvcUpdated"
)
// localStorageEvent holds an event passed from the controller to the local storage.
type localStorageEvent struct {
Type localStorageEventType
LocalStorage *api.ArangoLocalStorage
Type localStorageEventType
LocalStorage *api.ArangoLocalStorage
PersistentVolumeClaim *v1.PersistentVolumeClaim
}
const (
@ -85,8 +89,10 @@ type LocalStorage struct {
eventsCli corev1.EventInterface
image string
inspectTrigger trigger.Trigger
image string
imagePullPolicy v1.PullPolicy
inspectTrigger trigger.Trigger
pvCleaner *pvCleaner
}
// New creates a new LocalStorage from the given API object.
@ -102,9 +108,12 @@ func New(config Config, deps Dependencies, apiObject *api.ArangoLocalStorage) (*
eventCh: make(chan *localStorageEvent, localStorageEventQueueSize),
stopCh: make(chan struct{}),
eventsCli: deps.KubeCli.Core().Events(apiObject.GetNamespace()),
pvCleaner: newPVCleaner(deps.Log, deps.KubeCli),
}
go ls.run()
go ls.listenForPvcEvents()
go ls.pvCleaner.Run(ls.stopCh)
return ls, nil
}
@ -147,12 +156,13 @@ func (ls *LocalStorage) run() {
//log := ls.deps.Log
// Find out my image
image, err := ls.getMyImage()
image, pullPolicy, err := ls.getMyImage()
if err != nil {
ls.failOnError(err, "Failed to get my own image")
return
}
ls.image = image
ls.imagePullPolicy = pullPolicy
// Create StorageClass
if err := ls.ensureStorageClass(ls.apiObject); err != nil {
@ -166,8 +176,15 @@ func (ls *LocalStorage) run() {
return
}
// Create Service to access provisioners
if err := ls.ensureProvisionerService(ls.apiObject); err != nil {
ls.failOnError(err, "Failed to create service")
return
}
inspectionInterval := maxInspectionInterval
//recentInspectionErrors := 0
recentInspectionErrors := 0
var pvsNeededSince *time.Time
for {
select {
case <-ls.stopCh:
@ -182,12 +199,59 @@ func (ls *LocalStorage) run() {
ls.failOnError(err, "Failed to handle local storage update")
return
}
case eventPVCAdded, eventPVCUpdated:
// Do an inspection of PVC's
ls.inspectTrigger.Trigger()
default:
panic("unknown event type" + event.Type)
}
case <-ls.inspectTrigger.Done():
// TODO
hasError := false
unboundPVCs, err := ls.inspectPVCs()
if err != nil {
hasError = true
ls.createEvent(k8sutil.NewErrorEvent("PVC inspection failed", err, ls.apiObject))
}
pvsAvailable, err := ls.inspectPVs()
if err != nil {
hasError = true
ls.createEvent(k8sutil.NewErrorEvent("PV inspection failed", err, ls.apiObject))
}
if len(unboundPVCs) == 0 {
pvsNeededSince = nil
} else if len(unboundPVCs) > 0 {
createNow := false
if pvsNeededSince != nil && time.Since(*pvsNeededSince) > time.Second*30 {
// Create now
createNow = true
} else if pvsAvailable < len(unboundPVCs) {
// Create now
createNow = true
} else {
// Volumes are there, just may no be a match.
// Wait for that
if pvsNeededSince == nil {
now := time.Now()
pvsNeededSince = &now
}
}
if createNow {
ctx := context.Background()
if err := ls.createPVs(ctx, ls.apiObject, unboundPVCs); err != nil {
hasError = true
ls.createEvent(k8sutil.NewErrorEvent("PV creation failed", err, ls.apiObject))
}
}
}
if hasError {
if recentInspectionErrors == 0 {
inspectionInterval = minInspectionInterval
recentInspectionErrors++
}
} else {
recentInspectionErrors = 0
}
case <-time.After(inspectionInterval):
// Trigger inspection
@ -201,7 +265,7 @@ func (ls *LocalStorage) run() {
}
}
// handleArangoLocalStorageUpdatedEvent is called when the deployment is updated by the user.
// handleArangoLocalStorageUpdatedEvent is called when the local storage is updated by the user.
func (ls *LocalStorage) handleArangoLocalStorageUpdatedEvent(event *localStorageEvent) error {
log := ls.deps.Log.With().Str("localStorage", event.LocalStorage.GetName()).Logger()
@ -240,7 +304,7 @@ func (ls *LocalStorage) handleArangoLocalStorageUpdatedEvent(event *localStorage
// Save updated spec
if err := ls.updateCRSpec(newAPIObject.Spec); err != nil {
return maskAny(fmt.Errorf("failed to update ArangoDeployment spec: %v", err))
return maskAny(fmt.Errorf("failed to update ArangoLocalStorage spec: %v", err))
}
// Trigger inspect
@ -341,7 +405,7 @@ func (ls *LocalStorage) failOnError(err error, msg string) {
// that to the API server.
func (ls *LocalStorage) reportFailedStatus() {
log := ls.deps.Log
log.Info().Msg("deployment failed. Reporting failed reason...")
log.Info().Msg("local storage failed. Reporting failed reason...")
op := func() error {
ls.status.State = api.LocalStorageStateFailed

View file

@ -24,21 +24,26 @@ package provisioner
import "context"
const (
DefaultPort = 8929
)
// API of the provisioner
type API interface {
// GetFSInfo fetches information from the filesystem containing
// the given local path.
GetFSInfo(ctx context.Context, localPath string) (FSInfo, error)
// GetInfo fetches information from the filesystem containing
// the given local path on the current node.
GetInfo(ctx context.Context, localPath string) (Info, error)
// Prepare a volume at the given local path
Prepare(ctx context.Context, localPath string) error
// Remove a volume with the given local path
Remove(ctx context.Context, localPath string) error
}
// FSInfo holds information of a filesystem.
type FSInfo struct {
Available int64 `json:"available"`
Capacity int64 `json:"capacity"`
// Info holds information of a filesystem on a node.
type Info struct {
NodeName string `json:"nodeName"`
Available int64 `json:"available"`
Capacity int64 `json:"capacity"`
}
// Request body for API HTTP requests.

View file

@ -81,19 +81,19 @@ var (
}
)
// GetFSInfo fetches information from the filesystem containing
// GetInfo fetches information from the filesystem containing
// the given local path.
func (c *client) GetFSInfo(ctx context.Context, localPath string) (provisioner.FSInfo, error) {
func (c *client) GetInfo(ctx context.Context, localPath string) (provisioner.Info, error) {
input := provisioner.Request{
LocalPath: localPath,
}
req, err := c.newRequest("POST", "/fsinfo", input)
req, err := c.newRequest("POST", "/info", input)
if err != nil {
return provisioner.FSInfo{}, maskAny(err)
return provisioner.Info{}, maskAny(err)
}
var result provisioner.FSInfo
var result provisioner.Info
if err := c.do(ctx, req, &result); err != nil {
return provisioner.FSInfo{}, maskAny(err)
return provisioner.Info{}, maskAny(err)
}
return result, nil
}

View file

@ -28,24 +28,19 @@ import (
"github.com/rs/zerolog"
"golang.org/x/sys/unix"
"k8s.io/client-go/kubernetes"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
)
// Config for the storage provisioner
type Config struct {
Address string // Server address to listen on
NodeName string
Namespace string
ServiceAccount string
LocalPath []string
Address string // Server address to listen on
NodeName string // Name of the run I'm running now
}
// Dependencies for the storage provisioner
type Dependencies struct {
Log zerolog.Logger
KubeCli kubernetes.Interface
Log zerolog.Logger
}
// Provisioner implements a Local storage provisioner
@ -67,12 +62,12 @@ func (p *Provisioner) Run(ctx context.Context) {
runServer(ctx, p.Log, p.Address, p)
}
// GetFSInfo fetches information from the filesystem containing
// GetInfo fetches information from the filesystem containing
// the given local path.
func (p *Provisioner) GetFSInfo(ctx context.Context, localPath string) (provisioner.FSInfo, error) {
func (p *Provisioner) GetInfo(ctx context.Context, localPath string) (provisioner.Info, error) {
statfs := &unix.Statfs_t{}
if err := unix.Statfs(localPath, statfs); err != nil {
return provisioner.FSInfo{}, maskAny(err)
return provisioner.Info{}, maskAny(err)
}
// Available is blocks available * fragment size
@ -81,7 +76,7 @@ func (p *Provisioner) GetFSInfo(ctx context.Context, localPath string) (provisio
// Capacity is total block count * fragment size
capacity := int64(statfs.Blocks) * int64(statfs.Bsize)
return provisioner.FSInfo{
return provisioner.Info{
Available: available,
Capacity: capacity,
}, nil

View file

@ -42,7 +42,7 @@ const (
// runServer runs a HTTP server serving the given API
func runServer(ctx context.Context, log zerolog.Logger, addr string, api provisioner.API) error {
mux := httprouter.New()
mux.POST("/fsinfo", getFSInfoHandler(api))
mux.POST("/info", getInfoHandler(api))
mux.POST("/prepare", getPrepareHandler(api))
mux.POST("/remove", getRemoveHandler(api))
@ -71,14 +71,14 @@ func runServer(ctx context.Context, log zerolog.Logger, addr string, api provisi
}
}
func getFSInfoHandler(api provisioner.API) func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
func getInfoHandler(api provisioner.API) func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
return func(w http.ResponseWriter, r *http.Request, ps httprouter.Params) {
ctx := r.Context()
var input provisioner.Request
if err := parseBody(r, &input); err != nil {
handleError(w, err)
} else {
result, err := api.GetFSInfo(ctx, input.LocalPath)
result, err := api.GetInfo(ctx, input.LocalPath)
if err != nil {
handleError(w, err)
} else {

125
pkg/storage/pv_cleanup.go Normal file
View file

@ -0,0 +1,125 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"sync"
"time"
"k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"github.com/arangodb/k8s-operator/pkg/util/trigger"
"github.com/rs/zerolog"
)
type pvCleaner struct {
mutex sync.Mutex
log zerolog.Logger
cli kubernetes.Interface
items []v1.PersistentVolume
trigger trigger.Trigger
}
// newPVCleaner creates a new cleaner of persistent volumes.
func newPVCleaner(log zerolog.Logger, cli kubernetes.Interface) *pvCleaner {
return &pvCleaner{
log: log,
cli: cli,
}
}
// Run continues cleaning PV's until the given channel is closed.
func (c *pvCleaner) Run(stopCh <-chan struct{}) {
for {
delay := time.Hour
hasMore, err := c.cleanFirst()
if err != nil {
c.log.Error().Err(err).Msg("Failed to clean PersistentVolume")
}
if hasMore {
delay = time.Millisecond * 5
}
select {
case <-stopCh:
// We're done
return
case <-c.trigger.Done():
// Continue
case <-time.After(delay):
// Continue
}
}
}
// Add the given volume to the list of items to clean.
func (c *pvCleaner) Add(pv v1.PersistentVolume) {
c.mutex.Lock()
defer c.mutex.Unlock()
// Check the existing list first, ignore if already found
for _, x := range c.items {
if x.GetUID() == pv.GetUID() {
return
}
}
// Is new, add it
c.items = append(c.items, pv)
c.trigger.Trigger()
}
// cleanFirst tries to clean the first PV in the list.
// Returns (hasMore, error)
func (c *pvCleaner) cleanFirst() (bool, error) {
var first *v1.PersistentVolume
c.mutex.Lock()
if len(c.items) > 0 {
first = &c.items[0]
}
c.mutex.Unlock()
if first == nil {
// Nothing todo
return false, nil
}
// Do actual cleaning
if err := c.clean(*first); err != nil {
return true, maskAny(err)
}
// Remove first from list
c.mutex.Lock()
c.items = c.items[1:]
remaining := len(c.items)
c.mutex.Unlock()
return remaining > 0, nil
}
// clean tries to clean the given PV.
func (c *pvCleaner) clean(pv v1.PersistentVolume) error {
return nil
}

205
pkg/storage/pv_creator.go Normal file
View file

@ -0,0 +1,205 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"context"
"encoding/json"
"fmt"
"math/rand"
"net"
"path/filepath"
"sort"
"strconv"
"strings"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/dchest/uniuri"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/k8s-operator/pkg/apis/storage/v1alpha"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner/client"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
const (
defaultVolumeSize = int64(8 * 1024 * 1024 * 1024) // 8GB
)
// createPVs creates a given number of PersistentVolume's.
func (ls *LocalStorage) createPVs(ctx context.Context, apiObject *api.ArangoLocalStorage, unboundClaims []v1.PersistentVolumeClaim) error {
log := ls.deps.Log
// Find provisioner endpoints
ns := apiObject.GetNamespace()
listOptions := k8sutil.LocalStorageListOpt(apiObject.GetName(), roleProvisioner)
items, err := ls.deps.KubeCli.CoreV1().Endpoints(ns).List(listOptions)
if err != nil {
return maskAny(err)
}
addrs := createValidEndpointList(items)
if len(addrs) == 0 {
// No provisioners available
return maskAny(fmt.Errorf("No ready provisioner endpoints found"))
}
// Create clients for endpoints
clients := make([]provisioner.API, len(addrs))
for i, addr := range addrs {
var err error
clients[i], err = client.New(fmt.Sprintf("http://%s", addr))
if err != nil {
return maskAny(err)
}
}
// Randomize list
rand.Shuffle(len(clients), func(i, j int) {
clients[i], clients[j] = clients[j], clients[i]
})
for i, claim := range unboundClaims {
// Find size of PVC
volSize := defaultVolumeSize
if reqStorage := claim.Spec.Resources.Requests.StorageEphemeral(); reqStorage != nil {
if v, ok := reqStorage.AsInt64(); ok {
volSize = v
}
}
// Create PV
if err := ls.createPV(ctx, apiObject, clients, i, volSize); err != nil {
log.Error().Err(err).Msg("Failed to create PersistentVolume")
}
}
return nil
}
// createPV creates a PersistentVolume.
func (ls *LocalStorage) createPV(ctx context.Context, apiObject *api.ArangoLocalStorage, clients []provisioner.API, clientsOffset int, volSize int64) error {
log := ls.deps.Log
// Try clients
for clientIdx := 0; clientIdx < len(clients); clientIdx++ {
client := clients[(clientsOffset+clientIdx)%len(clients)]
// Try local path within client
for _, localPathRoot := range apiObject.Spec.LocalPath {
log := log.With().Str("local-path-root", localPathRoot).Logger()
info, err := client.GetInfo(ctx, localPathRoot)
if err != nil {
log.Error().Err(err).Msg("Failed to get client info")
continue
}
if info.Available < volSize {
log.Debug().Msg("Not enough available size")
continue
}
// Ok, prepare a directory
name := strings.ToLower(uniuri.New())
localPath := filepath.Join(localPathRoot, name)
log = log.With().Str("local-path", localPath).Logger()
if err := client.Prepare(ctx, localPath); err != nil {
log.Error().Err(err).Msg("Failed to prepare local path")
continue
}
// Create a volume
pvName := apiObject.GetName() + "-" + name
volumeMode := v1.PersistentVolumeFilesystem
nodeAff, err := createNodeAffinity(info.NodeName)
if err != nil {
return maskAny(err) // No continue here, since this should just not happen
}
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: pvName,
Annotations: map[string]string{
// AnnProvisionedBy: config.ProvisionerName,
v1.AlphaStorageNodeAffinityAnnotation: nodeAff,
},
},
Spec: v1.PersistentVolumeSpec{
Capacity: v1.ResourceList{
v1.ResourceStorage: *resource.NewQuantity(volSize, resource.BinarySI),
},
PersistentVolumeReclaimPolicy: v1.PersistentVolumeReclaimRetain,
PersistentVolumeSource: v1.PersistentVolumeSource{
Local: &v1.LocalVolumeSource{
Path: localPath,
},
},
AccessModes: []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
},
StorageClassName: apiObject.Spec.StorageClass.Name,
VolumeMode: &volumeMode,
},
}
if _, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().Create(pv); err != nil {
log.Error().Err(err).Msg("Failed to create PersistentVolume")
continue
}
return nil
}
}
return maskAny(fmt.Errorf("No more nodes available"))
}
// createValidEndpointList convers the given endpoints list into
// valid addresses.
func createValidEndpointList(list *v1.EndpointsList) []string {
result := make([]string, 0, len(list.Items))
for _, ep := range list.Items {
for _, subset := range ep.Subsets {
for _, ip := range subset.Addresses {
addr := net.JoinHostPort(ip.IP, strconv.Itoa(provisioner.DefaultPort))
result = append(result, addr)
}
}
}
sort.Strings(result)
return result
}
// createNodeAffinity creates a node affinity serialized to string.
func createNodeAffinity(nodeName string) (string, error) {
aff := v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
v1.NodeSelectorTerm{
MatchExpressions: []v1.NodeSelectorRequirement{
v1.NodeSelectorRequirement{
Key: "kubernetes.io/hostname",
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeName},
},
},
},
},
},
}
encoded, err := json.Marshal(aff)
if err != nil {
return "", maskAny(err)
}
return string(encoded), nil
}

View file

@ -0,0 +1,56 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// inspectPVs queries all PersistentVolume's and triggers a cleanup for
// released volumes.
// Returns the number of available PV's.
func (ls *LocalStorage) inspectPVs() (int, error) {
list, err := ls.deps.KubeCli.CoreV1().PersistentVolumes().List(metav1.ListOptions{})
if err != nil {
return 0, maskAny(err)
}
spec := ls.apiObject.Spec
availableVolumes := 0
for _, pv := range list.Items {
if pv.Spec.StorageClassName != spec.StorageClass.Name {
// Not our storage class
continue
}
switch pv.Status.Phase {
case v1.VolumeAvailable:
availableVolumes++
case v1.VolumeReleased:
if ls.isOwnerOf(&pv) {
// Cleanup this volume
ls.pvCleaner.Add(pv)
}
}
}
return availableVolumes, nil
}

View file

@ -0,0 +1,75 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)
// listenForPvcEvents keep listening for changes in PVC's until the given channel is closed.
func (ls *LocalStorage) listenForPvcEvents() {
source := cache.NewListWatchFromClient(
ls.deps.KubeCli.CoreV1().RESTClient(),
"persistentvolumeclaims",
ls.apiObject.GetNamespace(),
fields.Everything())
getPvc := func(obj interface{}) (*v1.PersistentVolumeClaim, bool) {
pvc, ok := obj.(*v1.PersistentVolumeClaim)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return nil, false
}
pvc, ok = tombstone.Obj.(*v1.PersistentVolumeClaim)
return pvc, ok
}
return pvc, true
}
_, informer := cache.NewIndexerInformer(source, &v1.PersistentVolumeClaim{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pvc, ok := getPvc(obj); ok {
ls.send(&localStorageEvent{
Type: eventPVCAdded,
PersistentVolumeClaim: pvc,
})
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
if pvc, ok := getPvc(newObj); ok {
ls.send(&localStorageEvent{
Type: eventPVCUpdated,
PersistentVolumeClaim: pvc,
})
}
},
DeleteFunc: func(obj interface{}) {
// Ignore
},
}, cache.Indexers{})
informer.Run(ls.stopCh)
}

View file

@ -0,0 +1,67 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// inspectPVCs queries all PVC's and checks if there is a need to
// build new persistent volumes.
// Returns the PVC's that need a volume.
func (ls *LocalStorage) inspectPVCs() ([]v1.PersistentVolumeClaim, error) {
ns := ls.apiObject.GetNamespace()
list, err := ls.deps.KubeCli.CoreV1().PersistentVolumeClaims(ns).List(metav1.ListOptions{})
if err != nil {
return nil, maskAny(err)
}
spec := ls.apiObject.Spec
var result []v1.PersistentVolumeClaim
for _, pvc := range list.Items {
if !pvcMatchesStorageClass(pvc, spec.StorageClass.Name, spec.StorageClass.IsDefault) {
continue
}
if !pvcNeedsVolume(pvc) {
continue
}
result = append(result, pvc)
}
return result, nil
}
// pvcMatchesStorageClass checks if the given pvc requests a volume
// of the given storage class.
func pvcMatchesStorageClass(pvc v1.PersistentVolumeClaim, storageClassName string, isDefault bool) bool {
scn := pvc.Spec.StorageClassName
if scn == nil {
// No storage class specified, default is used
return isDefault
}
return *scn == storageClassName
}
// pvcNeedsVolume checks if the given pvc is in need of a persistent volume.
func pvcNeedsVolume(pvc v1.PersistentVolumeClaim) bool {
return pvc.Status.Phase == v1.ClaimPending
}

60
pkg/storage/service.go Normal file
View file

@ -0,0 +1,60 @@
//
// DISCLAIMER
//
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Author Ewout Prangsma
//
package storage
import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/k8s-operator/pkg/apis/storage/v1alpha"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
// ensureProvisionerService ensures that a service is created for accessing the
// provisioners.
func (ls *LocalStorage) ensureProvisionerService(apiObject *api.ArangoLocalStorage) error {
labels := k8sutil.LabelsForLocalStorage(apiObject.GetName(), roleProvisioner)
svc := &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: apiObject.GetName(),
Labels: labels,
},
Spec: v1.ServiceSpec{
Ports: []v1.ServicePort{
v1.ServicePort{
Name: "provisioner",
Protocol: v1.ProtocolTCP,
Port: provisioner.DefaultPort,
},
},
Selector: labels,
},
}
svc.SetOwnerReferences(append(svc.GetOwnerReferences(), apiObject.AsOwner()))
ns := apiObject.GetNamespace()
if _, err := ls.deps.KubeCli.CoreV1().Services(ns).Create(svc); err != nil && !k8sutil.IsAlreadyExists(err) {
return maskAny(err)
}
return nil
}

View file

@ -31,6 +31,10 @@ import (
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
var (
storageClassProvisioner = api.SchemeGroupVersion.Group + "/localstorage"
)
// ensureStorageClass creates a storage class for the given local storage.
// If such a class already exists, the create is ignored.
func (l *LocalStorage) ensureStorageClass(apiObject *api.ArangoLocalStorage) error {
@ -43,6 +47,7 @@ func (l *LocalStorage) ensureStorageClass(apiObject *api.ArangoLocalStorage) err
},
ReclaimPolicy: &reclaimPolicy,
VolumeBindingMode: &bindingMode,
Provisioner: storageClassProvisioner,
}
if _, err := l.deps.KubeCli.StorageV1().StorageClasses().Create(sc); !k8sutil.IsAlreadyExists(err) && err != nil {
return maskAny(err)

View file

@ -26,24 +26,25 @@ import (
"fmt"
"time"
"k8s.io/apimachinery/pkg/runtime/schema"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
api "github.com/arangodb/k8s-operator/pkg/apis/arangodb/v1alpha"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
"github.com/arangodb/k8s-operator/pkg/util/retry"
)
// CreateCRD creates a custom resouce definition.
func CreateCRD(clientset apiextensionsclient.Interface, crdName, rkind, rplural string, shortName ...string) error {
func CreateCRD(clientset apiextensionsclient.Interface, groupVersion schema.GroupVersion, crdName, rkind, rplural string, shortName ...string) error {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdName,
},
Spec: apiextensionsv1beta1.CustomResourceDefinitionSpec{
Group: api.SchemeGroupVersion.Group,
Version: api.SchemeGroupVersion.Version,
Group: groupVersion.Group,
Version: groupVersion.Version,
Scope: apiextensionsv1beta1.NamespaceScoped,
Names: apiextensionsv1beta1.CustomResourceDefinitionNames{
Plural: rplural,

View file

@ -46,9 +46,28 @@ func LabelsForDeployment(deploymentName, role string) map[string]string {
return l
}
// LabelsForLocalStorage returns a map of labels, given to all resources for given local storage name
func LabelsForLocalStorage(localStorageName, role string) map[string]string {
l := map[string]string{
"arango_local_storage": localStorageName,
"app": "arangodb",
}
if role != "" {
l["role"] = role
}
return l
}
// DeploymentListOpt creates a ListOptions matching all labels for the given deployment name.
func DeploymentListOpt(deploymentName string) metav1.ListOptions {
return metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(LabelsForDeployment(deploymentName, "")).String(),
}
}
// LocalStorageListOpt creates a ListOptions matching all labels for the given local storage name.
func LocalStorageListOpt(localStorageName, role string) metav1.ListOptions {
return metav1.ListOptions{
LabelSelector: labels.SelectorFromSet(LabelsForLocalStorage(localStorageName, role)).String(),
}
}

View file

@ -24,20 +24,16 @@ package main
import (
"context"
goflag "flag"
"fmt"
"net"
"os"
"strconv"
"github.com/spf13/cobra"
"github.com/arangodb/k8s-operator/pkg/logging"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner"
"github.com/arangodb/k8s-operator/pkg/storage/provisioner/service"
"github.com/arangodb/k8s-operator/pkg/util/constants"
"github.com/arangodb/k8s-operator/pkg/util/k8sutil"
)
const (
defaultProvisionerPort = 8929
)
var (
@ -52,9 +48,7 @@ var (
}
storageProvisioner struct {
port int
localPath []string
storageClassName string
port int
}
)
@ -63,14 +57,12 @@ func init() {
cmdStorage.AddCommand(cmdStorageProvisioner)
f := cmdStorageProvisioner.Flags()
f.StringSliceVar(&storageProvisioner.localPath, "local-path", nil, "Local directory to provision volumes into")
f.StringVar(&storageProvisioner.storageClassName, "storage-class-name", "", "StorageClassName set in provisioned volumes")
f.IntVar(&storageProvisioner.port, "port", defaultProvisionerPort, "Port to listen on")
f.IntVar(&storageProvisioner.port, "port", provisioner.DefaultPort, "Port to listen on")
}
// Run the provisioner
func cmdStorageProvisionerRun(cmd *cobra.Command, args []string) {
goflag.CommandLine.Parse([]string{"-logtostderr"})
//goflag.CommandLine.Parse([]string{"-logtostderr"})
var err error
logService, err = logging.NewService(logLevel)
if err != nil {
@ -81,20 +73,12 @@ func cmdStorageProvisionerRun(cmd *cobra.Command, args []string) {
cliLog.Info().Msgf("Starting arangodb local storage provisioner, version %s build %s", projectVersion, projectBuild)
// Get environment
namespace := os.Getenv(constants.EnvOperatorPodNamespace)
if len(namespace) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodNamespace)
}
name := os.Getenv(constants.EnvOperatorPodName)
if len(name) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorPodName)
}
nodeName := os.Getenv(constants.EnvOperatorNodeName)
if len(name) == 0 {
if len(nodeName) == 0 {
cliLog.Fatal().Msgf("%s environment variable missing", constants.EnvOperatorNodeName)
}
config, deps, err := newProvisionerConfigAndDeps(nodeName, namespace, name)
config, deps, err := newProvisionerConfigAndDeps(nodeName)
if err != nil {
cliLog.Fatal().Err(err).Msg("Failed to create provisioner config & dependencies")
}
@ -108,26 +92,13 @@ func cmdStorageProvisionerRun(cmd *cobra.Command, args []string) {
}
// newProvisionerConfigAndDeps creates storage provisioner config & dependencies.
func newProvisionerConfigAndDeps(nodeName, namespace, name string) (service.Config, service.Dependencies, error) {
kubecli, err := k8sutil.NewKubeClient()
if err != nil {
return service.Config{}, service.Dependencies{}, maskAny(err)
}
serviceAccount, err := getMyPodServiceAccount(kubecli, namespace, name)
if err != nil {
return service.Config{}, service.Dependencies{}, maskAny(fmt.Errorf("Failed to get my pod's service account: %s", err))
}
func newProvisionerConfigAndDeps(nodeName string) (service.Config, service.Dependencies, error) {
cfg := service.Config{
LocalPath: storageProvisioner.localPath,
NodeName: nodeName,
Namespace: namespace,
ServiceAccount: serviceAccount,
Address: net.JoinHostPort("0.0.0.0", strconv.Itoa(storageProvisioner.port)),
NodeName: nodeName,
}
deps := service.Dependencies{
Log: logService.MustGetLogger("provisioner"),
KubeCli: kubecli,
Log: logService.MustGetLogger("provisioner"),
}
return cfg, deps, nil