1
0
Fork 0
mirror of https://github.com/arangodb/kube-arangodb.git synced 2024-12-14 11:57:37 +00:00

[Feature] Async Requests Support (#981)

This commit is contained in:
Adam Janikowski 2022-05-08 23:23:48 +03:00 committed by GitHub
parent aad986fed8
commit 3191f5ef43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 726 additions and 9 deletions

View file

@ -7,6 +7,7 @@
- (Feature) Recursive OwnerReference discovery
- (Maintenance) Add check make targets
- (Feature) Create support for local variables in actions.
- (Feature) Support for asynchronous ArangoD resquests.
## [1.2.11](https://github.com/arangodb/kube-arangodb/tree/1.2.11) (2022-04-30)
- (Bugfix) Orphan PVC are not removed

View file

@ -150,7 +150,7 @@ ifdef VERBOSE
TESTVERBOSEOPTIONS := -v
endif
EXCLUDE_DIRS := tests vendor .gobuild deps tools
EXCLUDE_DIRS := vendor .gobuild deps tools
SOURCES_QUERY := find ./ -type f -name '*.go' $(foreach EXCLUDE_DIR,$(EXCLUDE_DIRS), ! -path "*/$(EXCLUDE_DIR)/*")
SOURCES := $(shell $(SOURCES_QUERY))
DASHBOARDSOURCES := $(shell find $(DASHBOARDDIR)/src -name '*.js') $(DASHBOARDDIR)/package.json

View file

@ -0,0 +1,102 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package conn
import (
"context"
"net/http"
"path"
"github.com/arangodb/go-driver"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/arangodb/kube-arangodb/pkg/util/errors"
)
func NewAsyncConnection(c driver.Connection) driver.Connection {
return async{
connectionPass: connectionPass{
c: c,
wrap: asyncConnectionWrap,
},
}
}
func asyncConnectionWrap(c driver.Connection) (driver.Connection, error) {
return NewAsyncConnection(c), nil
}
type async struct {
connectionPass
}
func (a async) isAsyncIDSet(ctx context.Context) (string, bool) {
if ctx != nil {
if q := ctx.Value(asyncOperatorContextKey); q != nil {
if v, ok := q.(string); ok {
return v, true
}
}
}
return "", false
}
func (a async) Do(ctx context.Context, req driver.Request) (driver.Response, error) {
if id, ok := a.isAsyncIDSet(ctx); ok {
// We have ID Set, request should be done to fetch job id
req, err := a.c.NewRequest(http.MethodPut, path.Join("/_api/job", id))
if err != nil {
return nil, err
}
resp, err := a.c.Do(ctx, req)
if err != nil {
return nil, err
}
switch resp.StatusCode() {
case http.StatusNotFound:
return nil, newAsyncErrorNotFound(id)
case http.StatusNoContent:
return nil, newAsyncJobInProgress(id)
default:
return resp, nil
}
} else {
req.SetHeader(constants.ArangoHeaderAsyncKey, constants.ArangoHeaderAsyncValue)
resp, err := a.c.Do(ctx, req)
if err != nil {
return nil, err
}
switch resp.StatusCode() {
case http.StatusAccepted:
if v := resp.Header(constants.ArangoHeaderAsyncIDKey); len(v) == 0 {
return nil, errors.Newf("Missing async key response")
} else {
return nil, newAsyncJobInProgress(v)
}
default:
return nil, resp.CheckStatus(http.StatusAccepted)
}
}
}

View file

@ -0,0 +1,75 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package conn
import "fmt"
func IsAsyncErrorNotFound(err error) bool {
if err == nil {
return false
}
if _, ok := err.(asyncErrorNotFound); ok {
return true
}
return false
}
func newAsyncErrorNotFound(id string) error {
return asyncErrorNotFound{
jobID: id,
}
}
type asyncErrorNotFound struct {
jobID string
}
func (a asyncErrorNotFound) Error() string {
return fmt.Sprintf("Job with ID %s not found", a.jobID)
}
func IsAsyncJobInProgress(err error) (string, bool) {
if err == nil {
return "", false
}
if v, ok := err.(asyncJobInProgress); ok {
return v.jobID, true
}
return "", false
}
func newAsyncJobInProgress(id string) error {
return asyncJobInProgress{
jobID: id,
}
}
type asyncJobInProgress struct {
jobID string
}
func (a asyncJobInProgress) Error() string {
return fmt.Sprintf("Job with ID %s in progress", a.jobID)
}

View file

@ -0,0 +1,78 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package conn
import (
"context"
"net/http"
"testing"
"github.com/arangodb/go-driver"
"github.com/arangodb/kube-arangodb/pkg/util/tests"
"github.com/stretchr/testify/require"
)
func Test_Async(t *testing.T) {
s := tests.NewServer(t)
c := s.NewConnection()
c = NewAsyncConnection(c)
client, err := driver.NewClient(driver.ClientConfig{
Connection: c,
})
require.NoError(t, err)
a := tests.NewAsyncHandler(t, s, http.MethodGet, "/_api/version", http.StatusOK, driver.VersionInfo{
Server: "foo",
Version: "",
License: "",
Details: nil,
})
a.Start()
_, err = client.Version(context.Background())
require.Error(t, err)
id, ok := IsAsyncJobInProgress(err)
require.True(t, ok)
require.Equal(t, a.ID(), id)
a.InProgress()
ctx := WithAsyncID(context.TODO(), a.ID())
_, err = client.Version(ctx)
require.Error(t, err)
id, ok = IsAsyncJobInProgress(err)
require.True(t, ok)
require.Equal(t, a.ID(), id)
a.Done()
v, err := client.Version(ctx)
require.NoError(t, err)
require.Equal(t, v.Server, "foo")
defer s.Stop()
}

View file

@ -0,0 +1,72 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package conn
import (
"context"
"github.com/arangodb/go-driver"
)
type connectionWrap func(c driver.Connection) (driver.Connection, error)
var _ driver.Connection = connectionPass{}
type connectionPass struct {
c driver.Connection
wrap connectionWrap
}
func (c connectionPass) NewRequest(method, path string) (driver.Request, error) {
return c.c.NewRequest(method, path)
}
func (c connectionPass) Do(ctx context.Context, req driver.Request) (driver.Response, error) {
return c.c.Do(ctx, req)
}
func (c connectionPass) Unmarshal(data driver.RawObject, result interface{}) error {
return c.c.Unmarshal(data, result)
}
func (c connectionPass) Endpoints() []string {
return c.c.Endpoints()
}
func (c connectionPass) UpdateEndpoints(endpoints []string) error {
return c.c.UpdateEndpoints(endpoints)
}
func (c connectionPass) SetAuthentication(authentication driver.Authentication) (driver.Connection, error) {
newC, err := c.c.SetAuthentication(authentication)
if err != nil {
return nil, err
}
if f := c.wrap; f != nil {
return f(newC)
}
return newC, nil
}
func (c connectionPass) Protocols() driver.ProtocolSet {
return c.c.Protocols()
}

View file

@ -0,0 +1,37 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package conn
import "context"
type ContextKey string
const (
asyncOperatorContextKey ContextKey = "operator-async-id"
)
func WithAsyncID(ctx context.Context, id string) context.Context {
if ctx == nil {
ctx = context.Background()
}
return context.WithValue(ctx, asyncOperatorContextKey, id)
}

View file

@ -0,0 +1,27 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package constants
const (
ArangoHeaderAsyncIDKey = "x-arango-async-id"
ArangoHeaderAsyncKey = "x-arango-async"
ArangoHeaderAsyncValue = "store"
)

View file

@ -21,13 +21,14 @@
package tests
import (
"context"
"testing"
api "github.com/arangodb/kube-arangodb/pkg/apis/deployment/v1"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/stretchr/testify/require"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"testing"
"context"
"github.com/stretchr/testify/require"
)
func NewArangoDeployment(name string) *api.ArangoDeployment {

View file

@ -21,13 +21,14 @@
package tests
import (
"testing"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
"context"
"github.com/stretchr/testify/require"
"testing"
"github.com/arangodb/kube-arangodb/pkg/deployment/resources/inspector"
inspectorInterface "github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector"
"github.com/arangodb/kube-arangodb/pkg/util/k8sutil/inspector/throttle"
"github.com/arangodb/kube-arangodb/pkg/util/kclient"
"github.com/stretchr/testify/require"
)
const FakeNamespace = "fake"

209
pkg/util/tests/server.go Normal file
View file

@ -0,0 +1,209 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package tests
import (
"encoding/json"
"fmt"
"net"
"net/http"
"sync"
"testing"
"github.com/arangodb/go-driver"
httpdriver "github.com/arangodb/go-driver/http"
"github.com/stretchr/testify/require"
)
func NewServer(t *testing.T) Server {
s := &server{
t: t,
stop: make(chan struct{}),
stopped: make(chan struct{}),
started: make(chan struct{}),
done: make(chan struct{}),
}
go s.run()
<-s.started
return s
}
type Server interface {
NewConnection() driver.Connection
NewClient() driver.Client
Handle(f http.HandlerFunc)
Addr() string
Stop()
}
type server struct {
lock sync.Mutex
t *testing.T
handlers []http.HandlerFunc
port int
stop, stopped, started, done chan struct{}
}
func (s *server) NewClient() driver.Client {
c, err := driver.NewClient(driver.ClientConfig{
Connection: s.NewConnection(),
})
require.NoError(s.t, err)
return c
}
func (s *server) NewConnection() driver.Connection {
c, err := httpdriver.NewConnection(httpdriver.ConnectionConfig{
Endpoints: []string{
s.Addr(),
},
ContentType: driver.ContentTypeJSON,
})
require.NoError(s.t, err)
return c
}
func (s *server) Handle(f http.HandlerFunc) {
s.lock.Lock()
defer s.lock.Unlock()
s.handlers = append(s.handlers, f)
}
func (s *server) Addr() string {
return fmt.Sprintf("http://127.0.0.1:%d", s.port)
}
func (s *server) Stop() {
s.lock.Lock()
defer s.lock.Unlock()
close(s.stop)
<-s.done
if q := len(s.handlers); q > 0 {
require.Failf(s.t, "Pending messages", "Count %d", q)
}
}
func (s *server) run() {
defer close(s.done)
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(s.t, err)
s.port = listener.Addr().(*net.TCPAddr).Port
m := http.NewServeMux()
m.HandleFunc("/", s.handle)
server := http.Server{
Handler: m,
}
var serverErr error
go func() {
defer close(s.stopped)
close(s.started)
go func() {
<-s.stop
require.NoError(s.t, server.Close())
}()
serverErr = server.Serve(listener)
}()
<-s.stopped
if serverErr != http.ErrServerClosed {
require.NoError(s.t, serverErr)
}
}
func (s *server) handle(writer http.ResponseWriter, request *http.Request) {
s.lock.Lock()
defer s.lock.Unlock()
var handler http.HandlerFunc
switch len(s.handlers) {
case 0:
require.Fail(s.t, "No pending messages")
case 1:
handler = s.handlers[0]
s.handlers = nil
default:
handler = s.handlers[0]
s.handlers = s.handlers[1:]
}
handler(writer, request)
}
func NewSimpleHandler(t *testing.T, method string, path string, resp func(t *testing.T) (int, interface{})) http.HandlerFunc {
return NewCustomRequestHandler(t, method, path, nil, nil, resp)
}
func NewCustomRequestHandler(t *testing.T, method string, path string, reqVerify func(t *testing.T, r *http.Request), respHeaders func(t *testing.T) map[string]string, resp func(t *testing.T) (int, interface{})) http.HandlerFunc {
return func(writer http.ResponseWriter, request *http.Request) {
require.Equal(t, method, request.Method)
require.Equal(t, path, request.RequestURI)
if reqVerify != nil {
reqVerify(t, request)
}
code, r := resp(t)
writer.Header().Add("content-type", "application/json")
if respHeaders != nil {
h := respHeaders(t)
for k, v := range h {
writer.Header().Add(k, v)
}
}
writer.WriteHeader(code)
if r != nil {
d, err := json.Marshal(r)
require.NoError(t, err)
s, err := writer.Write(d)
require.NoError(t, err)
require.Equal(t, len(d), s)
}
}
}

View file

@ -0,0 +1,114 @@
//
// DISCLAIMER
//
// Copyright 2016-2022 ArangoDB GmbH, Cologne, Germany
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Copyright holder is ArangoDB GmbH, Cologne, Germany
//
package tests
import (
"fmt"
"net/http"
"testing"
"github.com/arangodb/kube-arangodb/pkg/util/constants"
"github.com/dchest/uniuri"
"github.com/stretchr/testify/require"
)
func NewAsyncHandler(t *testing.T, s Server, method string, path string, retCode int, ret interface{}) AsyncHandler {
return &asyncHandler{
t: t,
s: s,
ret: ret,
retCode: retCode,
method: method,
path: path,
id: uniuri.NewLen(32),
}
}
type AsyncHandler interface {
ID() string
Start()
InProgress()
Missing()
Done()
}
type asyncHandler struct {
t *testing.T
s Server
ret interface{}
retCode int
method, path string
id string
}
func (a *asyncHandler) Missing() {
p := fmt.Sprintf("/_api/job/%s", a.id)
a.s.Handle(NewCustomRequestHandler(a.t, http.MethodPut, p, func(t *testing.T, r *http.Request) {
v := r.Header.Get(constants.ArangoHeaderAsyncKey)
require.Equal(t, "", v)
}, nil, func(t *testing.T) (int, interface{}) {
return http.StatusNotFound, nil
}))
}
func (a *asyncHandler) Start() {
a.s.Handle(NewCustomRequestHandler(a.t, a.method, a.path, func(t *testing.T, r *http.Request) {
v := r.Header.Get(constants.ArangoHeaderAsyncKey)
require.Equal(t, constants.ArangoHeaderAsyncValue, v)
}, func(t *testing.T) map[string]string {
return map[string]string{
constants.ArangoHeaderAsyncIDKey: a.id,
}
}, func(t *testing.T) (int, interface{}) {
return http.StatusAccepted, nil
}))
}
func (a *asyncHandler) InProgress() {
p := fmt.Sprintf("/_api/job/%s", a.id)
a.s.Handle(NewCustomRequestHandler(a.t, http.MethodPut, p, func(t *testing.T, r *http.Request) {
v := r.Header.Get(constants.ArangoHeaderAsyncKey)
require.Equal(t, "", v)
}, nil, func(t *testing.T) (int, interface{}) {
return http.StatusNoContent, nil
}))
}
func (a *asyncHandler) Done() {
p := fmt.Sprintf("/_api/job/%s", a.id)
a.s.Handle(NewCustomRequestHandler(a.t, http.MethodPut, p, func(t *testing.T, r *http.Request) {
v := r.Header.Get(constants.ArangoHeaderAsyncKey)
require.Equal(t, "", v)
}, nil, func(t *testing.T) (int, interface{}) {
return a.retCode, a.ret
}))
}
func (a *asyncHandler) ID() string {
return a.id
}