diff --git a/pkg/csi/csi_ops.go b/pkg/csi/csi_ops.go index eb7a0eb..63c3368 100644 --- a/pkg/csi/csi_ops.go +++ b/pkg/csi/csi_ops.go @@ -6,10 +6,12 @@ import ( "net/http" "net/url" "strings" + "time" "github.com/kanisterio/kanister/pkg/kube" kankube "github.com/kanisterio/kanister/pkg/kube" kansnapshot "github.com/kanisterio/kanister/pkg/kube/snapshot" + "github.com/kanisterio/kanister/pkg/poll" "github.com/kastenhq/kubestr/pkg/common" "github.com/kastenhq/kubestr/pkg/csi/types" snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" @@ -19,6 +21,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -27,6 +30,13 @@ import ( "k8s.io/client-go/transport/spdy" ) +const ( + defaultReadyWaitTimeout = 2 * time.Minute + + PVCKind = "PersistentVolumeClaim" + PodKind ="Pod" +) + //go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_argument_validator.go -package=mocks . ArgumentValidator type ArgumentValidator interface { //Rename @@ -83,11 +93,13 @@ func (o *validateOperations) ValidateVolumeSnapshotClass(ctx context.Context, vo type ApplicationCreator interface { CreatePVC(ctx context.Context, args *types.CreatePVCArgs) (*v1.PersistentVolumeClaim, error) CreatePod(ctx context.Context, args *types.CreatePodArgs) (*v1.Pod, error) + WaitForPVCReady(ctx context.Context, namespace string, pvcName string) error WaitForPodReady(ctx context.Context, namespace string, podName string) error } type applicationCreate struct { kubeCli kubernetes.Interface + k8sObjectReadyTimeout time.Duration } func (c *applicationCreate) CreatePVC(ctx context.Context, args *types.CreatePVCArgs) (*v1.PersistentVolumeClaim, error) { @@ -187,14 +199,92 @@ func (c *applicationCreate) CreatePod(ctx context.Context, args *types.CreatePod return podRes, nil } +func (c *applicationCreate) WaitForPVCReady(ctx context.Context, namespace, name string) error { + if c.kubeCli == nil { + return fmt.Errorf("kubeCli not initialized") + } + + err := c.waitForPVCReady(ctx, namespace, name) + if err != nil { + eventErr := c.getErrorFromEvents(ctx, namespace, name, PVCKind) + if eventErr != nil { + return errors.Wrapf(eventErr, "had issues creating PVC") + } + } + return err +} + +func (c *applicationCreate) waitForPVCReady(ctx context.Context, namespace string, name string) error { + pvcReadyTimeout := c.k8sObjectReadyTimeout + if pvcReadyTimeout == 0 { + pvcReadyTimeout = defaultReadyWaitTimeout + } + + timeoutCtx, waitCancel := context.WithTimeout(ctx, pvcReadyTimeout) + defer waitCancel() + return poll.Wait(timeoutCtx, func(ctx context.Context) (bool, error) { + pvc, err := c.kubeCli.CoreV1().PersistentVolumeClaims(namespace).Get(timeoutCtx, name, metav1.GetOptions{}) + if err != nil { + return false, errors.Wrapf(err, "Could not find PVC") + } + + if pvc.Status.Phase == v1.ClaimLost { + return false, fmt.Errorf("failed to create a PVC, ClaimLost") + } + + return pvc.Status.Phase == v1.ClaimBound, nil + }) +} + func (c *applicationCreate) WaitForPodReady(ctx context.Context, namespace string, podName string) error { if c.kubeCli == nil { return fmt.Errorf("kubeCli not initialized") } - err := kankube.WaitForPodReady(ctx, c.kubeCli, namespace, podName) + err := c.waitForPodReady(ctx, namespace, podName) + if err != nil { + eventErr := c.getErrorFromEvents(ctx, namespace, podName, PodKind) + if eventErr != nil { + return errors.Wrapf(eventErr, "had issues creating Pod") + } + } return err } +func (c *applicationCreate) waitForPodReady(ctx context.Context, namespace string, podName string) error { + podReadyTimeout := c.k8sObjectReadyTimeout + if podReadyTimeout == 0 { + podReadyTimeout = defaultReadyWaitTimeout + } + + timeoutCtx, waitCancel := context.WithTimeout(ctx, podReadyTimeout) + defer waitCancel() + err := kankube.WaitForPodReady(timeoutCtx, c.kubeCli, namespace, podName) + return err +} + +func (c *applicationCreate) getErrorFromEvents(ctx context.Context, namespace, name, kind string) error { + fieldSelectors := fields.Set{ + "involvedObject.kind": kind, + "involvedObject.name": name, + }.AsSelector().String() + listOptions := metav1.ListOptions{ + TypeMeta: metav1.TypeMeta{Kind: kind}, + FieldSelector: fieldSelectors, + } + + events, eventErr := c.kubeCli.CoreV1().Events(namespace).List(ctx, listOptions) + if eventErr != nil { + return errors.Wrapf(eventErr, "failed to retreieve events for %s of kind: %s", name, kind) + } + + for _, event := range events.Items { + if event.Type == v1.EventTypeWarning { + return fmt.Errorf(event.Message) + } + } + return nil +} + //go:generate go run github.com/golang/mock/mockgen -destination=mocks/mock_snapshot_creator.go -package=mocks . SnapshotCreator type SnapshotCreator interface { NewSnapshotter() (kansnapshot.Snapshotter, error) diff --git a/pkg/csi/csi_ops_test.go b/pkg/csi/csi_ops_test.go index 82b2279..92a0ae0 100644 --- a/pkg/csi/csi_ops_test.go +++ b/pkg/csi/csi_ops_test.go @@ -4,10 +4,12 @@ import ( "context" "errors" "fmt" + "strings" kansnapshot "github.com/kanisterio/kanister/pkg/kube/snapshot" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1" + pkgerrors "github.com/pkg/errors" "github.com/kastenhq/kubestr/pkg/common" "github.com/kastenhq/kubestr/pkg/csi/types" snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" @@ -462,6 +464,7 @@ func (s *CSITestSuite) TestCreatePVC(c *C) { func (s *CSITestSuite) TestCreatePod(c *C) { ctx := context.Background() for _, tc := range []struct { + description string cli kubernetes.Interface args *types.CreatePodArgs failCreates bool @@ -469,6 +472,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker Checker }{ { + description: "pod with container image and runAsUser 1000 created", cli: fake.NewSimpleClientset(), args: &types.CreatePodArgs{ GenerateName: "name", @@ -482,17 +486,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker: NotNil, }, { - cli: fake.NewSimpleClientset(), - args: &types.CreatePodArgs{ - GenerateName: "name", - PVCName: "pvcname", - Namespace: "ns", - Command: []string{"somecommand"}, - }, - errChecker: IsNil, - podChecker: NotNil, - }, - { + description: "Pod creation error on kubeCli", cli: fake.NewSimpleClientset(), args: &types.CreatePodArgs{ GenerateName: "name", @@ -505,6 +499,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker: NotNil, }, { + description: "Pod generate name arg not set", cli: fake.NewSimpleClientset(), args: &types.CreatePodArgs{ GenerateName: "", @@ -516,6 +511,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker: IsNil, }, { + description: "PVC name not set error", cli: fake.NewSimpleClientset(), args: &types.CreatePodArgs{ GenerateName: "name", @@ -527,6 +523,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker: IsNil, }, { + description: "default namespace pod is created", cli: fake.NewSimpleClientset(), args: &types.CreatePodArgs{ GenerateName: "name", @@ -538,6 +535,7 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker: IsNil, }, { + description: "ns namespace pod is created", cli: fake.NewSimpleClientset(), args: &types.CreatePodArgs{ GenerateName: "name", @@ -549,12 +547,14 @@ func (s *CSITestSuite) TestCreatePod(c *C) { podChecker: NotNil, }, { + description: "kubeCli not initialized", cli: nil, args: &types.CreatePodArgs{}, errChecker: NotNil, podChecker: IsNil, }, } { + fmt.Println("test:", tc.description) creator := &applicationCreate{kubeCli: tc.cli} if tc.failCreates { creator.kubeCli.(*fake.Clientset).Fake.PrependReactor("create", "pods", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { @@ -598,7 +598,6 @@ func (s *CSITestSuite) TestCreatePod(c *C) { } else { c.Check(pod.Spec.SecurityContext, IsNil) } - } } } @@ -1160,3 +1159,91 @@ func (s *CSITestSuite) TestDeleteSnapshot(c *C) { c.Check(err, tc.errChecker) } } + +func (s *CSITestSuite) TestWaitForPVCReady(c *C) { + ctx := context.Background() + const ns = "ns" + const pvc = "pvc" + boundPVC := s.getPVC(ns, pvc, v1.ClaimBound) + claimLostPVC := s.getPVC(ns, pvc, v1.ClaimLost) + stuckPVC := s.getPVC(ns, pvc, "") + normalGetFunc := func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return + } + deadlineExceededGetFunc := func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, nil, pkgerrors.Wrapf(context.DeadlineExceeded, "some wrapped error") + } + + warningEvent := v1.Event{ + Type: v1.EventTypeWarning, + Message: "waiting for a volume to be created, either by external provisioner \"ceph.com/rbd\" or manually created by system administrator", + } + for _, tc := range []struct { + description string + cli kubernetes.Interface + pvcGetFunc func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) + eventsList []v1.Event + errChecker Checker + errString string + }{ + { + description: "Happy path", + cli: fake.NewSimpleClientset(boundPVC), + pvcGetFunc: normalGetFunc, + errChecker: IsNil, + }, + { + description: "Missing PVC", + cli: fake.NewSimpleClientset(), + pvcGetFunc: normalGetFunc, + errChecker: NotNil, + errString: "Could not find PVC", + }, + { + description: "PVC ClaimLost", + cli: fake.NewSimpleClientset(claimLostPVC), + pvcGetFunc: normalGetFunc, + errChecker: NotNil, + errString: "ClaimLost", + }, + { + description: "context.DeadlineExceeded but no event warnings", + cli: fake.NewSimpleClientset(stuckPVC), + pvcGetFunc: deadlineExceededGetFunc, + errChecker: NotNil, + errString: context.DeadlineExceeded.Error(), + }, + { + description: "context.DeadlineExceeded, unable to provision PVC", + cli: fake.NewSimpleClientset(stuckPVC), + pvcGetFunc: deadlineExceededGetFunc, + eventsList: []v1.Event{warningEvent}, + errChecker: NotNil, + errString: warningEvent.Message, + }, + }{ + fmt.Println("test:", tc.description) + creator := &applicationCreate{kubeCli: tc.cli} + creator.kubeCli.(*fake.Clientset).PrependReactor("get", "persistentvolumeclaims", tc.pvcGetFunc) + creator.kubeCli.(*fake.Clientset).PrependReactor("list", "events", func(action k8stesting.Action) (handled bool, ret runtime.Object, err error) { + return true, &v1.EventList{Items: tc.eventsList}, nil + }) + err := creator.WaitForPVCReady(ctx, ns, pvc) + c.Check(err, tc.errChecker) + if err != nil { + c.Assert(strings.Contains(err.Error(), tc.errString), Equals, true) + } + } +} + +func (s *CSITestSuite) getPVC(ns, pvc string, phase v1.PersistentVolumeClaimPhase) *v1.PersistentVolumeClaim { + return &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: pvc, + Namespace: ns, + }, + Status: v1.PersistentVolumeClaimStatus { + Phase: phase, + }, + } +} \ No newline at end of file diff --git a/pkg/csi/mocks/mock_application_creator.go b/pkg/csi/mocks/mock_application_creator.go index adf785f..2ea6684 100644 --- a/pkg/csi/mocks/mock_application_creator.go +++ b/pkg/csi/mocks/mock_application_creator.go @@ -79,3 +79,16 @@ func (mr *MockApplicationCreatorMockRecorder) WaitForPodReady(arg0, arg1, arg2 i mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForPodReady", reflect.TypeOf((*MockApplicationCreator)(nil).WaitForPodReady), arg0, arg1, arg2) } + +func (m *MockApplicationCreator) WaitForPVCReady(ctx context.Context, namespace string, pvcName string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitForPVCReady", ctx, namespace, pvcName) + err, _ := ret[0].(error) + return err +} + +// WaitForPodReady indicates an expected call of WaitForPVCReady. +func (mr *MockApplicationCreatorMockRecorder) WaitForPVCReady(ctx, namespace, pvcName interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitForPVCReady", reflect.TypeOf((*MockApplicationCreator)(nil).WaitForPVCReady), ctx, namespace, pvcName) +} \ No newline at end of file diff --git a/pkg/csi/snapshot_restore.go b/pkg/csi/snapshot_restore.go index 2d9ea02..ae4765e 100644 --- a/pkg/csi/snapshot_restore.go +++ b/pkg/csi/snapshot_restore.go @@ -33,6 +33,12 @@ type SnapshotRestoreRunner struct { } func (r *SnapshotRestoreRunner) RunSnapshotRestore(ctx context.Context, args *types.CSISnapshotRestoreArgs) (*types.CSISnapshotRestoreResults, error) { + if r.KubeCli == nil || r.DynCli == nil { + return &types.CSISnapshotRestoreResults{}, fmt.Errorf("cli uninitialized") + } + if args == nil { + return &types.CSISnapshotRestoreResults{}, fmt.Errorf("snapshot args not specified") + } r.srSteps = &snapshotRestoreSteps{ validateOps: &validateOperations{ kubeCli: r.KubeCli, @@ -43,6 +49,7 @@ func (r *SnapshotRestoreRunner) RunSnapshotRestore(ctx context.Context, args *ty }, createAppOps: &applicationCreate{ kubeCli: r.KubeCli, + k8sObjectReadyTimeout: args.K8sObjectReadyTimeout, }, dataValidatorOps: &validateData{ kubeCli: r.KubeCli, @@ -103,7 +110,8 @@ func (r *SnapshotRestoreRunner) RunSnapshotRestoreHelper(ctx context.Context, ar if args.Cleanup { fmt.Println("Cleaning up resources") - r.srSteps.Cleanup(ctx, results) + // don't let Cancelled/DeadlineExceeded context affect cleanup + r.srSteps.Cleanup(context.Background(), results) } return results, err @@ -183,6 +191,11 @@ func (s *snapshotRestoreSteps) CreateApplication(ctx context.Context, args *type if err != nil { return nil, pvc, errors.Wrap(err, "Failed to create POD") } + + if err = s.createAppOps.WaitForPVCReady(ctx, args.Namespace, pvc.Name); err != nil { + return pod, pvc, errors.Wrap(err, "PVC failed to become ready") + } + if err = s.createAppOps.WaitForPodReady(ctx, args.Namespace, pod.Name); err != nil { return pod, pvc, errors.Wrap(err, "Pod failed to become ready") } @@ -261,6 +274,11 @@ func (s *snapshotRestoreSteps) RestoreApplication(ctx context.Context, args *typ if err != nil { return nil, pvc, errors.Wrap(err, "Failed to create restored Pod") } + + if err = s.createAppOps.WaitForPVCReady(ctx, args.Namespace, pvc.Name); err != nil { + return pod, pvc, errors.Wrap(err, "PVC failed to become ready") + } + if err = s.createAppOps.WaitForPodReady(ctx, args.Namespace, pod.Name); err != nil { return pod, pvc, errors.Wrap(err, "Pod failed to become ready") } @@ -274,31 +292,31 @@ func (s *snapshotRestoreSteps) Cleanup(ctx context.Context, results *types.CSISn if results.OriginalPVC != nil { err := s.cleanerOps.DeletePVC(ctx, results.OriginalPVC.Name, results.OriginalPVC.Namespace) if err != nil { - fmt.Printf("Error deleteing PVC (%s) - (%v)\n", results.OriginalPVC.Name, err) + fmt.Printf("Error deleting original PVC (%s) - (%v)\n", results.OriginalPVC.Name, err) } } if results.OriginalPod != nil { err := s.cleanerOps.DeletePod(ctx, results.OriginalPod.Name, results.OriginalPod.Namespace) if err != nil { - fmt.Printf("Error deleteing Pod (%s) - (%v)\n", results.OriginalPod.Name, err) + fmt.Printf("Error deleting original Pod (%s) - (%v)\n", results.OriginalPod.Name, err) } } if results.ClonedPVC != nil { err := s.cleanerOps.DeletePVC(ctx, results.ClonedPVC.Name, results.ClonedPVC.Namespace) if err != nil { - fmt.Printf("Error deleteing PVC (%s) - (%v)\n", results.ClonedPVC.Name, err) + fmt.Printf("Error deleting cloned PVC (%s) - (%v)\n", results.ClonedPVC.Name, err) } } if results.ClonedPod != nil { err := s.cleanerOps.DeletePod(ctx, results.ClonedPod.Name, results.ClonedPod.Namespace) if err != nil { - fmt.Printf("Error deleteing Pod (%s) - (%v)\n", results.ClonedPod.Name, err) + fmt.Printf("Error deleting cloned Pod (%s) - (%v)\n", results.ClonedPod.Name, err) } } if results.Snapshot != nil { err := s.cleanerOps.DeleteSnapshot(ctx, results.Snapshot.Name, results.Snapshot.Namespace, s.SnapshotGroupVersion) if err != nil { - fmt.Printf("Error deleteing Snapshot (%s) - (%v)\n", results.Snapshot.Name, err) + fmt.Printf("Error deleting Snapshot (%s) - (%v)\n", results.Snapshot.Name, err) } } } diff --git a/pkg/csi/snapshot_restore_steps_test.go b/pkg/csi/snapshot_restore_steps_test.go index b6629e7..b7d3440 100644 --- a/pkg/csi/snapshot_restore_steps_test.go +++ b/pkg/csi/snapshot_restore_steps_test.go @@ -242,6 +242,7 @@ func (s *CSITestSuite) TestCreateApplication(c *C) { Name: "pod1", }, }, nil), + f.createAppOps.EXPECT().WaitForPVCReady(gomock.Any(), "ns", "pvc1").Return(nil), f.createAppOps.EXPECT().WaitForPodReady(gomock.Any(), "ns", "pod1").Return(nil), ) }, @@ -282,6 +283,7 @@ func (s *CSITestSuite) TestCreateApplication(c *C) { Name: "pod1", }, }, nil), + f.createAppOps.EXPECT().WaitForPVCReady(gomock.Any(), "ns", "pvc1").Return(nil), f.createAppOps.EXPECT().WaitForPodReady(gomock.Any(), "ns", "pod1").Return(fmt.Errorf("pod ready error")), ) }, @@ -328,6 +330,47 @@ func (s *CSITestSuite) TestCreateApplication(c *C) { podChecker: IsNil, pvcChecker: IsNil, }, + { // PVC times out provisioning + args: &types.CSISnapshotRestoreArgs{ + StorageClass: "sc", + Namespace: "ns", + RunAsUser: 100, + ContainerImage: "image", + }, + genString: "some string", + prepare: func(f *fields) { + gomock.InOrder( + f.createAppOps.EXPECT().CreatePVC(gomock.Any(), &types.CreatePVCArgs{ + GenerateName: originalPVCGenerateName, + StorageClass: "sc", + Namespace: "ns", + }).Return(&v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pvc1", + }, + }, nil), + f.createAppOps.EXPECT().CreatePod(gomock.Any(), &types.CreatePodArgs{ + GenerateName: originalPodGenerateName, + PVCName: "pvc1", + Namespace: "ns", + Command: []string{"/bin/sh"}, + ContainerArgs: []string{"-c", "echo 'some string' >> /data/out.txt; sync; tail -f /dev/null"}, + RunAsUser: 100, + ContainerImage: "image", + MountPath: "/data", + }).Return(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod1", + }, + }, nil), + f.createAppOps.EXPECT().WaitForPVCReady(gomock.Any(), "ns", "pvc1").Return(fmt.Errorf("rate: Wait(n=1) would exceed context deadline")), + f.createAppOps.EXPECT().WaitForPodReady(gomock.Any(), "ns", "pvc1").Times(0), + ) + }, + errChecker: NotNil, + podChecker: NotNil, + pvcChecker: NotNil, + }, } { ctrl := gomock.NewController(c) defer ctrl.Finish() @@ -587,6 +630,7 @@ func (s *CSITestSuite) TestRestoreApplication(c *C) { Name: "pod1", }, }, nil), + f.createAppOps.EXPECT().WaitForPVCReady(gomock.Any(), "ns", "pvc1").Return(nil), f.createAppOps.EXPECT().WaitForPodReady(gomock.Any(), "ns", "pod1").Return(nil), ) }, @@ -640,6 +684,7 @@ func (s *CSITestSuite) TestRestoreApplication(c *C) { Name: "pod1", }, }, nil), + f.createAppOps.EXPECT().WaitForPVCReady(gomock.Any(), "ns", "pvc1").Return(nil), f.createAppOps.EXPECT().WaitForPodReady(gomock.Any(), "ns", "pod1").Return(fmt.Errorf("pod ready error")), ) }, diff --git a/pkg/csi/types/csi_types.go b/pkg/csi/types/csi_types.go index 96d79ca..f1ab925 100644 --- a/pkg/csi/types/csi_types.go +++ b/pkg/csi/types/csi_types.go @@ -3,6 +3,7 @@ package types import ( "bytes" "fmt" + "time" snapv1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" v1 "k8s.io/api/core/v1" @@ -18,6 +19,7 @@ type CSISnapshotRestoreArgs struct { ContainerImage string Cleanup bool SkipCFSCheck bool + K8sObjectReadyTimeout time.Duration } func (a *CSISnapshotRestoreArgs) Validate() error {