1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

Add connection context that will serve us as an interface between command handler and the service

This commit is contained in:
Roman Gershman 2021-11-17 16:38:32 +02:00
parent ec78c8a2af
commit 38478cf069
6 changed files with 207 additions and 9 deletions

View file

@ -3,7 +3,7 @@ cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib db_slice.cc dragonfly_listener.cc dragonfly_connection.cc add_library(dragonfly_lib db_slice.cc dragonfly_listener.cc dragonfly_connection.cc
main_service.cc engine_shard_set.cc main_service.cc engine_shard_set.cc
redis_parser.cc resp_expr.cc) redis_parser.cc resp_expr.cc reply_builder.cc)
cxx_link(dragonfly_lib uring_fiber_lib cxx_link(dragonfly_lib uring_fiber_lib
fibers_ext strings_lib http_server_lib) fibers_ext strings_lib http_server_lib)

28
server/conn_context.h Normal file
View file

@ -0,0 +1,28 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include "server/reply_builder.h"
namespace dfly {
class Connection;
class EngineShardSet;
class ConnectionContext : public ReplyBuilder {
public:
ConnectionContext(::io::Sink* stream, Connection* owner) : ReplyBuilder(stream), owner_(owner) {
}
// TODO: to introduce proper accessors.
EngineShardSet* shard_set = nullptr;
Connection* owner() { return owner_;}
private:
Connection* owner_;
};
} // namespace dfly

View file

@ -10,6 +10,7 @@
#include "base/logging.h" #include "base/logging.h"
#include "server/main_service.h" #include "server/main_service.h"
#include "server/redis_parser.h" #include "server/redis_parser.h"
#include "server/conn_context.h"
#include "util/fiber_sched_algo.h" #include "util/fiber_sched_algo.h"
using namespace util; using namespace util;
@ -79,6 +80,9 @@ void Connection::HandleRequests() {
CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val))); CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
FiberSocketBase* peer = socket_.get(); FiberSocketBase* peer = socket_.get();
cc_.reset(new ConnectionContext(peer, this));
cc_->shard_set = &service_->shard_set();
InputLoop(peer); InputLoop(peer);
VLOG(1) << "Closed connection for peer " << socket_->RemoteEndpoint(); VLOG(1) << "Closed connection for peer " << socket_->RemoteEndpoint();
@ -100,7 +104,7 @@ void Connection::InputLoop(FiberSocketBase* peer) {
} }
io_buf.CommitWrite(*recv_sz); io_buf.CommitWrite(*recv_sz);
status = ParseRedis(&io_buf, peer); status = ParseRedis(&io_buf);
if (status == NEED_MORE) { if (status == NEED_MORE) {
status = OK; status = OK;
} else if (status != OK) { } else if (status != OK) {
@ -113,12 +117,12 @@ void Connection::InputLoop(FiberSocketBase* peer) {
} }
} }
auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) -> ParserStatus { auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus {
RespVec args; RespVec args;
uint32_t consumed = 0; uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK; RedisParser::Result result = RedisParser::OK;
error_code ec;
do { do {
result = redis_parser_->Parse(io_buf->InputBuffer(), &consumed, &args); result = redis_parser_->Parse(io_buf->InputBuffer(), &consumed, &args);
@ -131,15 +135,15 @@ auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) ->
CHECK_EQ(RespExpr::STRING, first.type); // TODO CHECK_EQ(RespExpr::STRING, first.type); // TODO
string_view sv = ToSV(first.GetBuf()); string_view sv = ToSV(first.GetBuf());
if (sv == "PING") { if (sv == "PING") {
ec = peer->Write(io::Buffer("PONG\r\n")); cc_->SendSimpleString("PONG");
} else if (sv == "SET") { } else if (sv == "SET") {
CHECK_EQ(3u, args.size()); CHECK_EQ(3u, args.size());
service_->Set(ToSV(args[1].GetBuf()), ToSV(args[2].GetBuf())); service_->Set(ToSV(args[1].GetBuf()), ToSV(args[2].GetBuf()));
ec = peer->Write(io::Buffer("OK\r\n")); cc_->SendOk();
} }
} }
io_buf->ConsumeInput(consumed); io_buf->ConsumeInput(consumed);
} while (RedisParser::OK == result && !ec); } while (RedisParser::OK == result && !cc_->ec());
parser_error_ = result; parser_error_ = result;
if (result == RedisParser::OK) if (result == RedisParser::OK)

View file

@ -12,6 +12,7 @@ namespace dfly {
class Service; class Service;
class RedisParser; class RedisParser;
class ConnectionContext;
class Connection : public util::Connection { class Connection : public util::Connection {
public: public:
@ -35,11 +36,14 @@ class Connection : public util::Connection {
void InputLoop(util::FiberSocketBase* peer); void InputLoop(util::FiberSocketBase* peer);
ParserStatus ParseRedis(base::IoBuf* buf, util::FiberSocketBase* peer); ParserStatus ParseRedis(base::IoBuf* buf);
std::unique_ptr<RedisParser> redis_parser_; std::unique_ptr<RedisParser> redis_parser_;
std::unique_ptr<ConnectionContext> cc_;
Service* service_; Service* service_;
unsigned parser_error_ = 0; unsigned parser_error_ = 0;
struct Shutdown; struct Shutdown;
std::unique_ptr<Shutdown> shutdown_; std::unique_ptr<Shutdown> shutdown_;
}; };

99
server/reply_builder.cc Normal file
View file

@ -0,0 +1,99 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/reply_builder.h"
#include <absl/strings/numbers.h>
#include <absl/strings/str_cat.h>
#include "base/logging.h"
using namespace std;
using absl::StrAppend;
namespace dfly {
namespace {
inline iovec constexpr IoVec(std::string_view s) {
iovec r{const_cast<char*>(s.data()), s.size()};
return r;
}
constexpr char kCRLF[] = "\r\n";
constexpr char kErrPref[] = "-ERR ";
constexpr char kSimplePref[] = "+";
} // namespace
RespSerializer::RespSerializer(io::Sink* stream) : sink_(stream) {
}
void RespSerializer::Send(const iovec* v, uint32_t len) {
error_code ec = sink_->Write(v, len);
if (ec) {
ec_ = ec;
}
}
void RespSerializer::SendDirect(std::string_view raw) {
iovec v = {IoVec(raw)};
Send(&v, 1);
}
void ReplyBuilder::SendBulkString(std::string_view str) {
char tmp[absl::numbers_internal::kFastToBufferSize + 3];
tmp[0] = '$'; // Format length
char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size()), tmp + 1);
*next++ = '\r';
*next++ = '\n';
std::string_view lenpref{tmp, size_t(next - tmp)};
// 3 parts: length, string and CRLF.
iovec v[3] = {IoVec(lenpref), IoVec(str), IoVec(kCRLF)};
return Send(v, ABSL_ARRAYSIZE(v));
}
void ReplyBuilder::SendError(std::string_view str) {
if (str[0] == '-') {
iovec v[] = {IoVec(str), IoVec(kCRLF)};
return Send(v, ABSL_ARRAYSIZE(v));
} else {
iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)};
return Send(v, ABSL_ARRAYSIZE(v));
}
}
void ReplyBuilder::SendError(OpStatus status) {
switch (status) {
case OpStatus::OK:
SendOk();
break;
case OpStatus::KEY_NOTFOUND:
SendError("no such key");
break;
default:
LOG(ERROR) << "Unsupported status " << status;
SendError("Internal error");
break;
}
}
void ReplyBuilder::SendNull() {
constexpr char kNullStr[] = "$-1\r\n";
iovec v[] = {IoVec(kNullStr)};
Send(v, ABSL_ARRAYSIZE(v));
}
void ReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
} // namespace dfly

63
server/reply_builder.h Normal file
View file

@ -0,0 +1,63 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include <string_view>
#include <optional>
#include "io/sync_stream_interface.h"
#include "server/op_status.h"
namespace dfly {
class RespSerializer {
public:
explicit RespSerializer(::io::Sink* sink);
std::error_code ec() const {
return ec_;
}
void CloseConnection() {
if (!ec_)
ec_ = std::make_error_code(std::errc::connection_aborted);
}
//! Sends a string as is without any formatting. raw should be RESP-encoded.
void SendDirect(std::string_view str);
::io::Sink* sink() { return sink_; }
protected:
void Send(const iovec* v, uint32_t len);
::io::Sink* sink_;
private:
std::error_code ec_;
};
class ReplyBuilder : public RespSerializer {
public:
explicit ReplyBuilder(::io::Sink* stream) : RespSerializer(stream) {
}
/// aka "$6\r\nfoobar\r\n"
void SendBulkString(std::string_view str);
void SendNull();
void SendOk() {
return SendSimpleString("OK");
}
void SendError(std::string_view str);
void SendError(OpStatus status);
//! See https://redis.io/topics/protocol
void SendSimpleString(std::string_view str);
private:
};
} // namespace dfly