1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] StorageV2 Integration Service Implementation (#1757)

This commit is contained in:
Adam Janikowski 2024-11-04 14:51:16 +01:00 committed by GitHub
parent 77fc964af5
commit bfd6c8e63d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 2345 additions and 1 deletions

View file

@ -13,6 +13,7 @@
- (Feature) StorageV2 Integration Service Definition
- (Feature) AWS Client
- (Feature) (Platform) Storage V1Alpha1
- (Feature) StorageV2 Integration Service Implementation
## [1.2.43](https://github.com/arangodb/kube-arangodb/tree/1.2.43) (2024-10-14)
- (Feature) ArangoRoute CRD

View file

@ -0,0 +1,76 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package definition
// KeyValuePairList is a strong typed list of KeyValuePair
type KeyValuePairList []*KeyValuePair
// GetValue gets the value for the requested key or nil if it doesn't exist
func (list *KeyValuePairList) GetValue(key string) *string {
if list == nil {
return nil
}
for _, kv := range *list {
if kv.GetKey() == key {
v := kv.GetValue()
return &v
}
}
return nil
}
// UpsertPair update or insert the given value for the requested key
// Returns inserted (otherwise updated)
func (list *KeyValuePairList) UpsertPair(key, value string) bool {
if list == nil {
return false
}
for _, kv := range *list {
if kv.GetKey() == key {
kv.Value = value
return false
}
}
*list = append(*list, &KeyValuePair{
Key: key,
Value: value,
})
return true
}
// RemovePairByKey removes all entries with the provided key and assigns the new list
// Returns removed, otherwise unmodified
func (list *KeyValuePairList) RemovePairByKey(key string) bool {
if list == nil {
return false
}
removed := false
remaining := make(KeyValuePairList, 0, len(*list))
for _, kv := range *list {
if kv.GetKey() != key {
remaining = append(remaining, kv)
} else {
removed = true
}
}
*list = remaining
return removed
}

View file

@ -0,0 +1,179 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.1
// source: integrations/shared/v1/definition/kv.proto
package definition
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
// Key Values Pairs
type KeyValuePair struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Key of the pair
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// Values of the pair
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (x *KeyValuePair) Reset() {
*x = KeyValuePair{}
if protoimpl.UnsafeEnabled {
mi := &file_integrations_shared_v1_definition_kv_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
}
func (x *KeyValuePair) String() string {
return protoimpl.X.MessageStringOf(x)
}
func (*KeyValuePair) ProtoMessage() {}
func (x *KeyValuePair) ProtoReflect() protoreflect.Message {
mi := &file_integrations_shared_v1_definition_kv_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
}
return ms
}
return mi.MessageOf(x)
}
// Deprecated: Use KeyValuePair.ProtoReflect.Descriptor instead.
func (*KeyValuePair) Descriptor() ([]byte, []int) {
return file_integrations_shared_v1_definition_kv_proto_rawDescGZIP(), []int{0}
}
func (x *KeyValuePair) GetKey() string {
if x != nil {
return x.Key
}
return ""
}
func (x *KeyValuePair) GetValue() string {
if x != nil {
return x.Value
}
return ""
}
var File_integrations_shared_v1_definition_kv_proto protoreflect.FileDescriptor
var file_integrations_shared_v1_definition_kv_proto_rawDesc = []byte{
0x0a, 0x2a, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73,
0x68, 0x61, 0x72, 0x65, 0x64, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x2f, 0x6b, 0x76, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x73, 0x68,
0x61, 0x72, 0x65, 0x64, 0x22, 0x36, 0x0a, 0x0c, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x75, 0x65,
0x50, 0x61, 0x69, 0x72, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28,
0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x42, 0x45, 0x5a, 0x43,
0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x61, 0x72, 0x61, 0x6e, 0x67,
0x6f, 0x64, 0x62, 0x2f, 0x6b, 0x75, 0x62, 0x65, 0x2d, 0x61, 0x72, 0x61, 0x6e, 0x67, 0x6f, 0x64,
0x62, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x67, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x73,
0x68, 0x61, 0x72, 0x65, 0x64, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x65, 0x66, 0x69, 0x6e, 0x69, 0x74,
0x69, 0x6f, 0x6e, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_integrations_shared_v1_definition_kv_proto_rawDescOnce sync.Once
file_integrations_shared_v1_definition_kv_proto_rawDescData = file_integrations_shared_v1_definition_kv_proto_rawDesc
)
func file_integrations_shared_v1_definition_kv_proto_rawDescGZIP() []byte {
file_integrations_shared_v1_definition_kv_proto_rawDescOnce.Do(func() {
file_integrations_shared_v1_definition_kv_proto_rawDescData = protoimpl.X.CompressGZIP(file_integrations_shared_v1_definition_kv_proto_rawDescData)
})
return file_integrations_shared_v1_definition_kv_proto_rawDescData
}
var file_integrations_shared_v1_definition_kv_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_integrations_shared_v1_definition_kv_proto_goTypes = []interface{}{
(*KeyValuePair)(nil), // 0: shared.KeyValuePair
}
var file_integrations_shared_v1_definition_kv_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_integrations_shared_v1_definition_kv_proto_init() }
func file_integrations_shared_v1_definition_kv_proto_init() {
if File_integrations_shared_v1_definition_kv_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_integrations_shared_v1_definition_kv_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*KeyValuePair); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_integrations_shared_v1_definition_kv_proto_rawDesc,
NumEnums: 0,
NumMessages: 1,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_integrations_shared_v1_definition_kv_proto_goTypes,
DependencyIndexes: file_integrations_shared_v1_definition_kv_proto_depIdxs,
MessageInfos: file_integrations_shared_v1_definition_kv_proto_msgTypes,
}.Build()
File_integrations_shared_v1_definition_kv_proto = out.File
file_integrations_shared_v1_definition_kv_proto_rawDesc = nil
file_integrations_shared_v1_definition_kv_proto_goTypes = nil
file_integrations_shared_v1_definition_kv_proto_depIdxs = nil
}

View file

@ -0,0 +1,34 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
syntax = "proto3";
package shared;
option go_package = "github.com/arangodb/kube-arangodb/integrations/shared/v1/definition";
// Key Values Pairs
message KeyValuePair {
// Key of the pair
string key = 1;
// Values of the pair
string value = 2;
}

View file

@ -0,0 +1,70 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2
import (
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
pbImplStorageV2SharedS3 "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared/s3"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
type Mod func(c Configuration) Configuration
type ConfigurationType string
const (
ConfigurationTypeS3 ConfigurationType = "s3"
)
func NewConfiguration(mods ...Mod) Configuration {
var cfg Configuration
return cfg.With(mods...)
}
type Configuration struct {
Type ConfigurationType
S3 pbImplStorageV2SharedS3.Configuration
}
func (c Configuration) IO() (pbImplStorageV2Shared.IO, error) {
switch c.Type {
case ConfigurationTypeS3:
return c.S3.New()
default:
return nil, errors.Errorf("Unknoen Type: %s", c.Type)
}
}
func (c Configuration) Validate() error {
return nil
}
func (c Configuration) With(mods ...Mod) Configuration {
n := c
for _, mod := range mods {
n = mod(n)
}
return n
}

View file

@ -0,0 +1,25 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2
import "github.com/arangodb/kube-arangodb/pkg/logging"
var logger = logging.Global().RegisterAndGetLogger("integration-storage-v2", logging.Info)

View file

@ -0,0 +1,34 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package shared
const (
// MaxChunkBytes contains the maximum number of bytes in a chunk (for Read/Write streaming operations)
MaxChunkBytes = 1024 * 1024
)
func NewBuffer(size int) []byte {
if size > MaxChunkBytes {
size = MaxChunkBytes
}
return make([]byte, size)
}

View file

@ -0,0 +1,63 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package shared
import (
"context"
"io"
"time"
"github.com/arangodb/kube-arangodb/pkg/util"
)
type Writer interface {
io.Writer
Close(ctx context.Context) (string, int64, error)
Closed() bool
}
type Reader interface {
io.Reader
Close(ctx context.Context) (string, int64, error)
Closed() bool
}
type File struct {
Key string
Info Info
}
type Info struct {
Size uint64
LastUpdatedAt time.Time
}
type IO interface {
Write(ctx context.Context, key string) (Writer, error)
Read(ctx context.Context, key string) (Reader, error)
Head(ctx context.Context, key string) (*Info, error)
Delete(ctx context.Context, key string) (bool, error)
List(ctx context.Context, key string) (util.NextIterator[[]File], error)
}

View file

@ -0,0 +1,57 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
awsHelper "github.com/arangodb/kube-arangodb/pkg/util/aws"
)
type Configuration struct {
BucketName string
BucketPrefix string
MaxListKeys *int64
Client awsHelper.Config
}
func (c Configuration) New() (pbImplStorageV2Shared.IO, error) {
prov, err := c.Client.GetAWSSession()
if err != nil {
return nil, err
}
storageClient := s3.New(prov, aws.NewConfig().WithRegion(c.Client.GetRegion()))
return &ios{
config: c,
client: storageClient,
uploader: s3manager.NewUploaderWithClient(storageClient),
downloader: s3manager.NewDownloaderWithClient(storageClient, func(downloader *s3manager.Downloader) {
downloader.Concurrency = 1
}),
}, nil
}

View file

@ -0,0 +1,44 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/arangodb/kube-arangodb/pkg/util"
)
func (i *ios) Delete(ctx context.Context, key string) (bool, error) {
_, err := i.client.DeleteObjectWithContext(ctx, &s3.DeleteObjectInput{
Key: util.NewType(i.key(key)),
Bucket: util.NewType(i.config.BucketName),
})
if err != nil {
if IsAWSNotFoundError(err) {
return false, nil
}
return false, err
}
return true, nil
}

View file

@ -0,0 +1,49 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
func IsAWSNotFoundError(err error) bool {
if err != nil {
{
var aerr awserr.Error
if errors.As(err, &aerr) {
switch aerr.Code() {
case s3.ErrCodeNoSuchKey, "NotFound":
return true
}
}
}
{
var aerr awserr.RequestFailure
if errors.As(err, &aerr) {
return IsAWSNotFoundError(aerr.OrigErr())
}
}
}
return false
}

View file

@ -0,0 +1,48 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"github.com/aws/aws-sdk-go/service/s3"
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
"github.com/arangodb/kube-arangodb/pkg/util"
)
func (i *ios) Head(ctx context.Context, key string) (*pbImplStorageV2Shared.Info, error) {
obj, err := i.client.HeadObjectWithContext(ctx, &s3.HeadObjectInput{
Bucket: util.NewType(i.config.BucketName),
Key: util.NewType(i.key(key)),
})
if err != nil {
if IsAWSNotFoundError(err) {
return nil, nil
}
return nil, err
}
return &pbImplStorageV2Shared.Info{
Size: uint64(util.TypeOrDefault(obj.ContentLength)),
LastUpdatedAt: util.TypeOrDefault(obj.LastModified),
}, nil
}

View file

@ -0,0 +1,66 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
"github.com/arangodb/kube-arangodb/pkg/util"
)
type ios struct {
config Configuration
client s3iface.S3API
uploader *s3manager.Uploader
downloader *s3manager.Downloader
}
func (i *ios) key(key string) string {
return fmt.Sprintf("%s%s", i.config.BucketPrefix, key)
}
func (i *ios) Write(ctx context.Context, key string) (pbImplStorageV2Shared.Writer, error) {
w := newWriter(i)
w.start(ctx, &s3manager.UploadInput{
Bucket: util.NewType(i.config.BucketName),
Key: util.NewType(i.key(key)),
})
return w, nil
}
func (i *ios) Read(ctx context.Context, key string) (pbImplStorageV2Shared.Reader, error) {
r := newReader(i)
r.start(ctx, &s3.GetObjectInput{
Bucket: util.NewType(i.config.BucketName),
Key: util.NewType(i.key(key)),
})
return r, nil
}

View file

@ -0,0 +1,121 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"fmt"
"io"
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/uuid"
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
"github.com/arangodb/kube-arangodb/pkg/util"
awsHelper "github.com/arangodb/kube-arangodb/pkg/util/aws"
)
const (
TestAwsProfile util.EnvironmentVariable = "TEST_AWS_PROFILE"
TestAwsRole util.EnvironmentVariable = "TEST_AWS_ROLE"
TestAWSBucket util.EnvironmentVariable = "TEST_AWS_BUCKET"
)
func getClient(t *testing.T) pbImplStorageV2Shared.IO {
v, ok := TestAwsProfile.Lookup()
if !ok {
t.Skipf("Client does not exists")
}
b, ok := TestAWSBucket.Lookup()
if !ok {
t.Skipf("Bucket does not exists")
}
var c awsHelper.Config
c.Region = "eu-central-1"
c.Provider.Config = awsHelper.ProviderConfig{
Profile: v,
}
r, ok := TestAwsRole.Lookup()
if ok {
c.Provider.Impersonate = awsHelper.ProviderImpersonate{
Role: r,
Name: "Test",
}
}
var cfg Configuration
cfg.Client = c
cfg.BucketName = b
cfg.BucketPrefix = fmt.Sprintf("test/%s/", uuid.NewUUID())
z, err := cfg.New()
require.NoError(t, err)
return z
}
func Test(t *testing.T) {
w := getClient(t)
data := make([]byte, 1024*1024*64)
for id := range data {
data[id] = 0
}
ctx, c := context.WithCancel(context.Background())
defer c()
q, err := w.Write(ctx, "test.data")
require.NoError(t, err)
_, err = util.WriteAll(q, data)
require.NoError(t, err)
checksum, size, err := q.Close(context.Background())
require.NoError(t, err)
t.Logf("Write Checksum: %s", checksum)
require.EqualValues(t, 1024*1024*64, size)
r, err := w.Read(context.Background(), "test.data")
require.NoError(t, err)
data, err = io.ReadAll(r)
require.NoError(t, err)
echecksum, esize, err := r.Close(context.Background())
require.NoError(t, err)
require.EqualValues(t, 1024*1024*64, esize)
require.Len(t, data, 1024*1024*64)
t.Logf("Read Checksum: %s", echecksum)
require.EqualValues(t, echecksum, checksum)
}

View file

@ -0,0 +1,101 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"io"
"strings"
"sync"
"github.com/aws/aws-sdk-go/service/s3"
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
"github.com/arangodb/kube-arangodb/pkg/util"
)
func (i *ios) List(_ context.Context, key string) (util.NextIterator[[]pbImplStorageV2Shared.File], error) {
return &listIterator{
parent: i,
key: key,
}, nil
}
type listIterator struct {
lock sync.Mutex
parent *ios
key string
next *string
done bool
}
func (l *listIterator) Next(ctx context.Context) ([]pbImplStorageV2Shared.File, error) {
l.lock.Lock()
defer l.lock.Unlock()
if l.done {
return nil, io.EOF
}
resp, err := l.parent.client.ListObjectsWithContext(ctx, &s3.ListObjectsInput{
Bucket: util.NewType(l.parent.config.BucketName),
Prefix: util.NewType(l.parent.key(l.key)),
MaxKeys: l.parent.config.MaxListKeys,
Marker: l.next,
})
if err != nil {
return nil, err
}
results := make([]pbImplStorageV2Shared.File, 0, len(resp.Contents))
for _, obj := range resp.Contents {
if obj == nil {
continue
}
if obj.Key == nil {
continue
}
var info pbImplStorageV2Shared.Info
info.Size = uint64(util.TypeOrDefault(obj.Size))
info.LastUpdatedAt = util.TypeOrDefault(obj.LastModified)
results = append(results, pbImplStorageV2Shared.File{
Key: strings.TrimPrefix(*obj.Key, l.parent.key(l.key)),
Info: info,
})
l.next = util.NewType(*obj.Key)
}
if !util.OptionalType(resp.IsTruncated, false) {
l.done = true
}
return results, nil
}

View file

@ -0,0 +1,151 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"crypto/sha256"
"errors"
"fmt"
"hash"
"io"
"os"
"sync"
"github.com/aws/aws-sdk-go/service/s3"
)
func newReader(parent *ios) *reader {
pr, pw := io.Pipe()
return &reader{
parent: parent,
closed: make(chan string),
pr: pr,
pw: pw,
checksum: sha256.New(),
}
}
type reader struct {
lock, closeLock sync.Mutex
parent *ios
closed chan string
err error
bytes int64
checksum hash.Hash
pr io.Reader
pw io.WriteCloser
}
func (w *reader) Read(p []byte) (n int, err error) {
w.lock.Lock()
defer w.lock.Unlock()
n, err = w.pr.Read(p)
if err == nil {
w.bytes += int64(n)
w.checksum.Write(p[:n])
return n, nil
}
if errors.Is(err, io.EOF) {
if !w.done() {
return 0, io.ErrUnexpectedEOF
}
if IsAWSNotFoundError(w.err) {
return 0, os.ErrNotExist
}
}
return n, err
}
func (w *reader) Closed() bool {
return w.done()
}
func (w *reader) Close(ctx context.Context) (string, int64, error) {
w.lock.Lock()
defer w.lock.Unlock()
if !w.done() {
return "", 0, io.ErrNoProgress
}
if err := w.err; err != nil {
return "", 0, err
}
return fmt.Sprintf("%02x", w.checksum.Sum(nil)), w.bytes, nil
}
func (w *reader) done() bool {
w.closeLock.Lock()
defer w.closeLock.Unlock()
select {
case <-w.closed:
return true
default:
return false
}
}
func (w *reader) start(ctx context.Context, input *s3.GetObjectInput) {
go w._start(ctx, input)
}
func (w *reader) _start(ctx context.Context, input *s3.GetObjectInput) {
defer func() {
w.closeLock.Lock()
defer w.closeLock.Unlock()
defer close(w.closed)
if err := w.pw.Close(); err != nil {
if w.err != nil {
w.err = err
}
}
buff := make([]byte, 128)
for {
_, err := w.pr.Read(buff)
if err != nil {
return
}
}
}()
_, err := w.parent.downloader.DownloadWithContext(ctx, wrapWithOffsetWriter(w.pw), input)
if err != nil {
w.err = err
return
}
}

View file

@ -0,0 +1,57 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"io"
"sync"
)
func wrapWithOffsetWriter(in io.Writer) io.WriterAt {
return &offsetWriter{
offset: 0,
out: in,
}
}
type offsetWriter struct {
lock sync.Mutex
offset int64
out io.Writer
}
func (o *offsetWriter) WriteAt(p []byte, off int64) (n int, err error) {
o.lock.Lock()
defer o.lock.Unlock()
if o.offset != off {
return 0, io.ErrUnexpectedEOF
}
n, err = o.out.Write(p)
if err != nil {
return 0, err
}
o.offset += int64(n)
return n, nil
}

View file

@ -0,0 +1,142 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package s3
import (
"context"
"crypto/sha256"
"fmt"
"hash"
"io"
"sync"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
)
func newWriter(parent *ios) *writer {
pr, pw := io.Pipe()
return &writer{
parent: parent,
closed: make(chan string),
pr: pr,
pw: pw,
checksum: sha256.New(),
}
}
type writer struct {
lock sync.Mutex
parent *ios
closed chan string
err error
bytes int64
checksum hash.Hash
pr io.Reader
pw io.WriteCloser
}
func (w *writer) Closed() bool {
return w.done()
}
func (w *writer) Write(p []byte) (n int, err error) {
w.lock.Lock()
defer w.lock.Unlock()
if w.done() {
return 0, w.err
}
n, err = w.pw.Write(p)
if err != nil {
return 0, err
}
if n > 0 {
w.bytes += int64(n)
w.checksum.Write(p[:n])
}
return n, nil
}
func (w *writer) done() bool {
select {
case <-w.closed:
return true
default:
return false
}
}
func (w *writer) Close(ctx context.Context) (string, int64, error) {
w.lock.Lock()
defer w.lock.Unlock()
if !w.done() {
if err := w.pw.Close(); err != nil {
return "", 0, err
}
<-w.closed
}
if w.err != nil {
return "", 0, w.err
}
return fmt.Sprintf("%02x", w.checksum.Sum(nil)), w.bytes, nil
}
func (w *writer) start(ctx context.Context, input *s3manager.UploadInput) {
go w._start(ctx, input)
}
func (w *writer) _start(ctx context.Context, input *s3manager.UploadInput) {
defer close(w.closed)
defer func() {
// Clean the channel
buff := make([]byte, 128)
for {
_, err := w.pr.Read(buff)
if err != nil {
return
}
}
}()
input.Body = w.pr
_, err := w.parent.uploader.UploadWithContext(ctx, input)
if err != nil {
w.err = err
return
}
}

View file

@ -0,0 +1,279 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2
import (
"context"
"io"
"os"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
pbStorageV2 "github.com/arangodb/kube-arangodb/integrations/storage/v2/definition"
pbImplStorageV2Shared "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
)
var _ pbStorageV2.StorageV2Server = &implementation{}
var _ svc.Handler = &implementation{}
func New(cfg Configuration) (svc.Handler, error) {
return newInternal(cfg)
}
func newInternal(c Configuration) (*implementation, error) {
if err := c.Validate(); err != nil {
return nil, errors.Wrapf(err, "Invalid config")
}
io, err := c.IO()
if err != nil {
return nil, err
}
return &implementation{
io: io,
}, nil
}
type implementation struct {
io pbImplStorageV2Shared.IO
pbStorageV2.UnimplementedStorageV2Server
}
func (i *implementation) Name() string {
return pbStorageV2.Name
}
func (i *implementation) Register(registrar *grpc.Server) {
pbStorageV2.RegisterStorageV2Server(registrar, i)
}
func (i *implementation) Health() svc.HealthState {
return svc.Healthy
}
func (i *implementation) WriteObject(server pbStorageV2.StorageV2_WriteObjectServer) error {
ctx, c := context.WithCancel(server.Context())
defer c()
log := logger.Str("func", "WriteObject")
msg, err := server.Recv()
if err == io.EOF || errors.IsGRPCCode(err, codes.Canceled) {
return io.ErrUnexpectedEOF
}
path := msg.GetPath().GetPath()
if path == "" {
log.Debug("path missing")
return status.Error(codes.InvalidArgument, "path missing")
}
wd, err := i.io.Write(ctx, path)
if err != nil {
return err
}
if _, err := util.WriteAll(wd, msg.GetChunk()); err != nil {
return err
}
for {
msg, err := server.Recv()
if errors.IsGRPCCode(err, codes.Canceled) {
c()
return io.ErrUnexpectedEOF
}
if errors.Is(err, io.EOF) {
checksum, bytes, err := wd.Close(ctx)
if err != nil {
return err
}
if err := server.SendAndClose(&pbStorageV2.StorageV2WriteObjectResponse{
Bytes: bytes,
Checksum: checksum,
}); err != nil {
log.Err(err).Debug("Failed to send WriteObjectControl message")
return err
}
return nil
}
if err != nil {
return err
}
if msg.GetPath() != nil {
if path != msg.GetPath().GetPath() {
log.Debug("path changed")
return status.Error(codes.InvalidArgument, "path changed")
}
}
if _, err := util.WriteAll(wd, msg.GetChunk()); err != nil {
return err
}
}
}
func (i *implementation) ReadObject(req *pbStorageV2.StorageV2ReadObjectRequest, server pbStorageV2.StorageV2_ReadObjectServer) error {
log := logger.Str("func", "ReadObject").Str("path", req.GetPath().GetPath())
ctx := server.Context()
path := req.GetPath().GetPath()
if path == "" {
return status.Errorf(codes.InvalidArgument, "path missing")
}
rd, err := i.io.Read(ctx, path)
if err != nil {
return err
}
buff := pbImplStorageV2Shared.NewBuffer(pbImplStorageV2Shared.MaxChunkBytes)
for {
n, err := rd.Read(buff)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
if errors.Is(err, os.ErrNotExist) {
return status.Errorf(codes.NotFound, "file not found")
}
return err
}
// Send chunk to caller
if err := server.Send(&pbStorageV2.StorageV2ReadObjectResponse{
Chunk: buff[:n],
}); err != nil {
log.Err(err).Debug("Failed to send ReadObjectChunk")
return err
}
}
}
func (i *implementation) HeadObject(ctx context.Context, req *pbStorageV2.StorageV2HeadObjectRequest) (*pbStorageV2.StorageV2HeadObjectResponse, error) {
log := logger.Str("func", "HeadObject").Str("path", req.GetPath().GetPath())
// Check request fields
path := req.GetPath().GetPath()
if path == "" {
return nil, status.Error(codes.InvalidArgument, "path missing")
}
info, err := i.io.Head(ctx, path)
if err != nil {
log.Err(err).Debug("getObjectInfo failed")
return nil, err
}
if info == nil {
return nil, status.Error(codes.NotFound, path)
}
return &pbStorageV2.StorageV2HeadObjectResponse{
Info: &pbStorageV2.StorageV2ObjectInfo{
Size: info.Size,
LastUpdated: timestamppb.New(info.LastUpdatedAt),
},
}, nil
}
func (i *implementation) DeleteObject(ctx context.Context, req *pbStorageV2.StorageV2DeleteObjectRequest) (*pbStorageV2.StorageV2DeleteObjectResponse, error) {
log := logger.Str("func", "DeleteObject").Str("path", req.GetPath().GetPath())
// Check request fields
path := req.GetPath().GetPath()
if path == "" {
return nil, status.Error(codes.InvalidArgument, "path missing")
}
deleted, err := i.io.Delete(ctx, path)
if err != nil {
log.Err(err).Debug("deleteObject failed")
return nil, err
}
if deleted {
return &pbStorageV2.StorageV2DeleteObjectResponse{}, nil
}
return nil, status.Error(codes.NotFound, "Object Not Found")
}
func (i *implementation) ListObjects(req *pbStorageV2.StorageV2ListObjectsRequest, server pbStorageV2.StorageV2_ListObjectsServer) error {
log := logger.Str("func", "ReadObject").Str("path", req.GetPath().GetPath())
ctx := server.Context()
path := req.GetPath().GetPath()
if path == "" {
return status.Errorf(codes.InvalidArgument, "path missing")
}
lister, err := i.io.List(ctx, path)
if err != nil {
log.Err(err).Debug("listObjects failed")
return err
}
for {
files, err := lister.Next(ctx)
if err != nil {
if errors.Is(err, io.EOF) {
return nil
}
log.Err(err).Debug("listObjects failed")
return err
}
ret := make([]*pbStorageV2.StorageV2Object, len(files))
for id := range files {
ret[id] = &pbStorageV2.StorageV2Object{
Path: &pbStorageV2.StorageV2Path{
Path: files[id].Key,
},
Info: &pbStorageV2.StorageV2ObjectInfo{
Size: files[id].Info.Size,
LastUpdated: timestamppb.New(files[id].Info.LastUpdatedAt),
},
}
}
if err := server.Send(&pbStorageV2.StorageV2ListObjectsResponse{
Files: ret,
}); err != nil {
log.Err(err).Debug("listObjects failed")
return err
}
}
}

View file

@ -0,0 +1,487 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2
import (
"bytes"
"context"
"crypto/rand"
"fmt"
"io"
"sort"
"strings"
"testing"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/util/uuid"
pbStorageV2 "github.com/arangodb/kube-arangodb/integrations/storage/v2/definition"
"github.com/arangodb/kube-arangodb/pkg/util"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
func listAllFilesHelper(t *testing.T, ctx context.Context, h pbStorageV2.StorageV2Client, prefix string) []*pbStorageV2.StorageV2Object {
var r []*pbStorageV2.StorageV2Object
res, err := h.ListObjects(ctx, &pbStorageV2.StorageV2ListObjectsRequest{
Path: &pbStorageV2.StorageV2Path{
Path: prefix,
},
})
require.NoError(t, err)
for {
files, err := res.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
r = append(r, files.GetFiles()...)
}
return r
}
func Test_List(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
h := Client(t, ctx, func(c Configuration) Configuration {
c.S3.MaxListKeys = util.NewType[int64](32)
return c
})
testFileListing(t, ctx, h)
}
func Test_Flow_16(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
h := Client(t, ctx, func(c Configuration) Configuration {
c.S3.MaxListKeys = util.NewType[int64](32)
return c
})
testS3BucketFileHandling(t, ctx, h, 16)
}
func Test_Flow_1024(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
h := Client(t, ctx, func(c Configuration) Configuration {
c.S3.MaxListKeys = util.NewType[int64](32)
return c
})
testS3BucketFileHandling(t, ctx, h, 1024)
}
func Test_Flow_1048576(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
h := Client(t, ctx, func(c Configuration) Configuration {
c.S3.MaxListKeys = util.NewType[int64](32)
return c
})
testS3BucketFileHandling(t, ctx, h, 1024*1024)
}
func Test_Flow_4194304(t *testing.T) {
ctx, c := context.WithCancel(context.Background())
defer c()
h := Client(t, ctx, func(c Configuration) Configuration {
c.S3.MaxListKeys = util.NewType[int64](32)
return c
})
testS3BucketFileHandling(t, ctx, h, 4*1024*1024)
}
func testFileListing(t *testing.T, ctx context.Context, h pbStorageV2.StorageV2Client) {
prefix := fmt.Sprintf("%s/", uuid.NewUUID())
t.Run("List", func(t *testing.T) {
var files []string
t.Run("RenderFileNames", func(t *testing.T) {
for i := 0; i < 128; i++ {
files = append(files, fmt.Sprintf("%sfile%04d", prefix, i))
files = append(files, fmt.Sprintf("%spath%04d/file", prefix, i))
}
})
sort.Strings(files)
t.Logf("Files: %d", len(files))
data := make([]byte, 1024)
n, err := rand.Read(data)
require.NoError(t, err)
require.EqualValues(t, 1024, n)
checksum := util.SHA256(data)
t.Run("UploadAll", func(t *testing.T) {
util.ParallelProcess(func(in string) {
wr, err := h.WriteObject(ctx)
require.NoError(t, err)
buff := make([]byte, 1024)
cf := bytes.NewReader(data)
for {
n, err := cf.Read(buff)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NoError(t, wr.Send(&pbStorageV2.StorageV2WriteObjectRequest{
Path: &pbStorageV2.StorageV2Path{
Path: in,
},
Chunk: buff[:n],
}))
}
ds, err := wr.CloseAndRecv()
require.NoError(t, err)
require.NotNil(t, ds)
require.EqualValues(t, checksum, ds.GetChecksum())
require.EqualValues(t, len(data), ds.GetBytes())
}, 32, files)
})
t.Run("CheckAll", func(t *testing.T) {
util.ParallelProcess(func(in string) {
wr, err := h.HeadObject(ctx, &pbStorageV2.StorageV2HeadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: in},
})
require.NoError(t, err)
require.NotNil(t, wr)
require.EqualValues(t, len(data), wr.GetInfo().GetSize())
}, 32, files)
})
t.Run("List", func(t *testing.T) {
revcFiles := listAllFilesHelper(t, ctx, h, prefix)
require.Len(t, revcFiles, len(files))
for id := range files {
require.EqualValues(t, strings.TrimPrefix(files[id], prefix), revcFiles[id].GetPath().GetPath())
require.EqualValues(t, revcFiles[id].GetInfo().GetSize(), len(data))
}
})
t.Run("ListSubFolder", func(t *testing.T) {
revcFiles := listAllFilesHelper(t, ctx, h, fmt.Sprintf("%spath0000/", prefix))
require.Len(t, revcFiles, 1)
require.EqualValues(t, "file", revcFiles[0].GetPath().GetPath())
require.EqualValues(t, len(data), revcFiles[0].GetInfo().GetSize())
})
t.Run("ListMisSubFolder", func(t *testing.T) {
revcFiles := listAllFilesHelper(t, ctx, h, fmt.Sprintf("%snon-existent/", prefix))
require.Len(t, revcFiles, 0)
})
t.Run("DeleteAll", func(t *testing.T) {
util.ParallelProcess(func(in string) {
wr, err := h.DeleteObject(ctx, &pbStorageV2.StorageV2DeleteObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: in},
})
require.NoError(t, err)
require.NotNil(t, wr)
}, 32, files)
})
})
}
func testS3BucketFileHandling(t *testing.T, ctx context.Context, h pbStorageV2.StorageV2Client, size int) {
t.Run(fmt.Sprintf("Size:%d", size), func(t *testing.T) {
prefix := fmt.Sprintf("%s/", uuid.NewUUID())
name := fmt.Sprintf("%stest.local", prefix)
nameTwo := fmt.Sprintf("%stest.local.two", prefix)
t.Logf("File: %s", name)
dataOne := make([]byte, size)
n, err := rand.Read(dataOne)
require.NoError(t, err)
require.EqualValues(t, size, n)
checksumOne := util.SHA256(dataOne)
dataTwo := make([]byte, size)
n, err = rand.Read(dataTwo)
require.NoError(t, err)
require.EqualValues(t, size, n)
checksumTwo := util.SHA256(dataTwo)
t.Logf("Checksum One: %s", checksumOne)
t.Logf("Checksum Two: %s", checksumTwo)
require.NotEqual(t, checksumTwo, checksumOne)
t.Run("Check if object exists", func(t *testing.T) {
resp, err := h.HeadObject(ctx, &pbStorageV2.StorageV2HeadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.EqualValues(t, codes.NotFound, errors.GRPCCode(err))
require.Nil(t, resp)
})
t.Run("Send Object", func(t *testing.T) {
wr, err := h.WriteObject(ctx)
require.NoError(t, err)
buff := make([]byte, 1024)
cf := bytes.NewReader(dataOne)
for {
n, err := cf.Read(buff)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NoError(t, wr.Send(&pbStorageV2.StorageV2WriteObjectRequest{
Path: &pbStorageV2.StorageV2Path{
Path: name,
},
Chunk: buff[:n],
}))
}
ds, err := wr.CloseAndRecv()
require.NoError(t, err)
require.NotNil(t, ds)
require.EqualValues(t, checksumOne, ds.GetChecksum())
require.EqualValues(t, len(dataOne), ds.GetBytes())
})
t.Run("Re-Check if object exists", func(t *testing.T) {
resp, err := h.HeadObject(ctx, &pbStorageV2.StorageV2HeadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.EqualValues(t, codes.OK, errors.GRPCCode(err))
require.NotNil(t, resp)
require.EqualValues(t, len(dataOne), resp.GetInfo().GetSize())
})
t.Run("Download Object", func(t *testing.T) {
wr, err := h.ReadObject(ctx, &pbStorageV2.StorageV2ReadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.NoError(t, err)
data := bytes.NewBuffer(nil)
for {
resp, err := wr.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
_, err = util.WriteAll(data, resp.GetChunk())
require.NoError(t, err)
}
pdata := data.Bytes()
require.Len(t, pdata, size)
pchecksum := util.SHA256(pdata)
require.EqualValues(t, checksumOne, pchecksum)
})
t.Run("Re-Send Object", func(t *testing.T) {
wr, err := h.WriteObject(ctx)
require.NoError(t, err)
buff := make([]byte, 1024)
cf := bytes.NewReader(dataTwo)
for {
n, err := cf.Read(buff)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NoError(t, wr.Send(&pbStorageV2.StorageV2WriteObjectRequest{
Path: &pbStorageV2.StorageV2Path{
Path: name,
},
Chunk: buff[:n],
}))
}
ds, err := wr.CloseAndRecv()
require.NoError(t, err)
require.NotNil(t, ds)
require.EqualValues(t, checksumTwo, ds.GetChecksum())
require.EqualValues(t, len(dataTwo), ds.GetBytes())
})
t.Run("List Objects", func(t *testing.T) {
revcFiles := listAllFilesHelper(t, ctx, h, prefix)
t.Logf("Size: %d", len(revcFiles))
})
t.Run("Send Second Object", func(t *testing.T) {
wr, err := h.WriteObject(ctx)
require.NoError(t, err)
buff := make([]byte, 1024)
cf := bytes.NewReader(dataOne)
for {
n, err := cf.Read(buff)
if err != nil {
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
}
require.NoError(t, wr.Send(&pbStorageV2.StorageV2WriteObjectRequest{
Path: &pbStorageV2.StorageV2Path{
Path: nameTwo,
},
Chunk: buff[:n],
}))
}
ds, err := wr.CloseAndRecv()
require.NoError(t, err)
require.NotNil(t, ds)
require.EqualValues(t, checksumOne, ds.GetChecksum())
require.EqualValues(t, len(dataOne), ds.GetBytes())
})
t.Run("Re-Download Object", func(t *testing.T) {
wr, err := h.ReadObject(ctx, &pbStorageV2.StorageV2ReadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.NoError(t, err)
data := bytes.NewBuffer(nil)
for {
resp, err := wr.Recv()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
_, err = util.WriteAll(data, resp.GetChunk())
require.NoError(t, err)
}
pdata := data.Bytes()
require.Len(t, pdata, size)
pchecksum := util.SHA256(pdata)
require.EqualValues(t, checksumTwo, pchecksum)
})
t.Run("Delete Object", func(t *testing.T) {
wr, err := h.DeleteObject(ctx, &pbStorageV2.StorageV2DeleteObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.NoError(t, err)
require.NotNil(t, wr)
})
t.Run("Delete Second Object", func(t *testing.T) {
wr, err := h.DeleteObject(ctx, &pbStorageV2.StorageV2DeleteObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: nameTwo},
})
require.NoError(t, err)
require.NotNil(t, wr)
})
t.Run("Re-Check if deleted object exists", func(t *testing.T) {
resp, err := h.HeadObject(ctx, &pbStorageV2.StorageV2HeadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.EqualValues(t, codes.NotFound, errors.GRPCCode(err))
require.Nil(t, resp)
})
t.Run("Download Deleted Object", func(t *testing.T) {
wr, err := h.ReadObject(ctx, &pbStorageV2.StorageV2ReadObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.NoError(t, err)
resp, err := wr.Recv()
require.EqualValues(t, codes.NotFound, errors.GRPCCode(err))
require.Nil(t, resp)
})
t.Run("Delete Deleted Object", func(t *testing.T) {
wr, err := h.DeleteObject(ctx, &pbStorageV2.StorageV2DeleteObjectRequest{
Path: &pbStorageV2.StorageV2Path{Path: name},
})
require.NoError(t, err)
require.NotNil(t, wr)
})
})
}

View file

@ -0,0 +1,110 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package v2
import (
"context"
_ "embed"
"fmt"
"testing"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/uuid"
pbStorageV2 "github.com/arangodb/kube-arangodb/integrations/storage/v2/definition"
pbImplStorageV2SharedS3 "github.com/arangodb/kube-arangodb/integrations/storage/v2/shared/s3"
"github.com/arangodb/kube-arangodb/pkg/logging"
"github.com/arangodb/kube-arangodb/pkg/util"
awsHelper "github.com/arangodb/kube-arangodb/pkg/util/aws"
"github.com/arangodb/kube-arangodb/pkg/util/svc"
"github.com/arangodb/kube-arangodb/pkg/util/tests/tgrpc"
)
const (
TestAwsProfile util.EnvironmentVariable = "TEST_AWS_PROFILE"
TestAwsRole util.EnvironmentVariable = "TEST_AWS_ROLE"
TestAWSBucket util.EnvironmentVariable = "TEST_AWS_BUCKET"
)
func getClient(t *testing.T, mods ...Mod) Configuration {
v, ok := TestAwsProfile.Lookup()
if !ok {
t.Skipf("Client does not exists")
}
b, ok := TestAWSBucket.Lookup()
if !ok {
t.Skipf("Bucket does not exists")
}
var c awsHelper.Config
c.Region = "eu-central-1"
c.Provider.Config = awsHelper.ProviderConfig{
Profile: v,
}
c.Provider.Type = awsHelper.ProviderTypeConfig
r, ok := TestAwsRole.Lookup()
if ok {
c.Provider.Impersonate = awsHelper.ProviderImpersonate{
Impersonate: true,
Role: r,
Name: "Test",
}
}
var scfg pbImplStorageV2SharedS3.Configuration
scfg.Client = c
scfg.BucketName = b
scfg.BucketPrefix = fmt.Sprintf("test/%s/", uuid.NewUUID())
var cfg Configuration
cfg.Type = ConfigurationTypeS3
cfg.S3 = scfg
return cfg.With(mods...)
}
func init() {
logging.Global().ApplyLogLevels(map[string]logging.Level{
logging.TopicAll: logging.Debug,
})
}
func Handler(t *testing.T, mods ...Mod) svc.Handler {
handler, err := New(getClient(t).With(mods...))
require.NoError(t, err)
return handler
}
func Client(t *testing.T, ctx context.Context, mods ...Mod) pbStorageV2.StorageV2Client {
local := svc.NewService(svc.Configuration{
Address: "127.0.0.1:0",
}, Handler(t, mods...))
start := local.Start(ctx)
return tgrpc.NewGRPCClient(t, ctx, pbStorageV2.NewStorageV2Client, start.Address())
}

View file

@ -51,6 +51,43 @@ func CauseWithNil(err error) error {
}
}
func ExtractCause[T error](err error) (T, bool) {
var d T
if err == nil {
return d, true
}
var v T
if errors.As(err, &v) {
return v, true
}
if err := CauseWithNil(err); err != nil {
return ExtractCause[T](err)
}
return d, false
}
func ExtractCauseHelper[T error](err error, extr func(err error) (T, bool)) (T, bool) {
var d T
if err == nil {
return d, true
}
if v, ok := extr(err); ok {
return v, true
}
if err := CauseWithNil(err); err != nil {
return ExtractCauseHelper[T](err, extr)
}
return d, false
}
func New(message string) error {
return errors.New(message)
}

74
pkg/util/errors/grpc.go Normal file
View file

@ -0,0 +1,74 @@
//
// DISCLAIMER
//
// Copyright 2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package errors
import (
"errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type grpcError interface {
error
GRPCStatus() *status.Status
}
func GRPCStatus(err error) (*status.Status, bool) {
v, ok := ExtractCauseHelper[grpcError](err, func(err error) (grpcError, bool) {
var gs grpcError
if errors.As(err, &gs) {
return gs, true
}
return nil, false
})
if !ok {
return status.New(codes.Unknown, err.Error()), false
}
return v.GRPCStatus(), true
}
func GRPCCode(err error) codes.Code {
if err == nil {
return codes.OK
}
if v, ok := GRPCStatus(err); ok {
return v.Code()
}
return codes.Unknown
}
func IsGRPCCode(err error, codes ...codes.Code) bool {
vc := GRPCCode(err)
for _, code := range codes {
if vc == code {
return true
}
}
return false
}

View file

@ -1,7 +1,7 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
// Copyright 2016-2024 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -22,6 +22,40 @@ package util
import "sync"
func ParallelProcess[T any](caller func(in T), threads int, in []T) {
r := ParallelInput(in)
var wg sync.WaitGroup
for id := 0; id < threads; id++ {
wg.Add(1)
go func() {
defer wg.Done()
for el := range r {
caller(el)
}
}()
}
wg.Wait()
}
func ParallelInput[T any](in []T) <-chan T {
r := make(chan T)
go func() {
defer close(r)
for id := range in {
r <- in[id]
}
}()
return r
}
// RunParallel runs actions parallelly throttling them to the given maximum number.
func RunParallel(max int, actions ...func() error) error {
c, close := ParallelThread(max)

View file

@ -21,6 +21,7 @@
package util
import (
"context"
"encoding/json"
"reflect"
)
@ -197,3 +198,7 @@ func InitOptional[T any](in *T, ok bool) *T {
var z T
return &z
}
type NextIterator[T any] interface {
Next(ctx context.Context) (T, error)
}