1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Add dragonfly skeleton: listener, connection and main

This commit is contained in:
Roman Gershman 2021-11-16 10:39:38 +02:00
parent 90516b53dd
commit 2bce379341
9 changed files with 402 additions and 0 deletions

View file

@ -24,3 +24,4 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR})
include_directories(async)
add_subdirectory(async)
add_subdirectory(server)

7
server/CMakeLists.txt Normal file
View file

@ -0,0 +1,7 @@
add_executable(dragonfly dfly_main.cc)
cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib dragonfly_listener.cc dragonfly_connection.cc main_service.cc)
cxx_link(dragonfly_lib uring_fiber_lib
fibers_ext strings_lib http_server_lib)

62
server/dfly_main.cc Normal file
View file

@ -0,0 +1,62 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "base/init.h"
#include "server/main_service.h"
#include "server/dragonfly_listener.h"
#include "util/accept_server.h"
#include "util/uring/uring_pool.h"
#include "util/varz.h"
DEFINE_int32(http_port, 8080, "Http port.");
DECLARE_uint32(port);
using namespace util;
namespace dfly {
void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) {
Service service(pool);
service.Init(acceptor);
if (http) {
service.RegisterHttp(http);
}
acceptor->AddListener(FLAGS_port, new Listener{&service});
acceptor->Run();
acceptor->Wait();
service.Shutdown();
}
} // namespace dfly
int main(int argc, char* argv[]) {
MainInitGuard guard(&argc, &argv);
CHECK_GT(FLAGS_port, 0u);
uring::UringPool pp{1024};
pp.Run();
AcceptServer acceptor(&pp);
HttpListener<>* http_listener = nullptr;
if (FLAGS_http_port >= 0) {
http_listener = new HttpListener<>;
// Ownership over http_listener is moved to the acceptor.
uint16_t port = acceptor.AddListener(FLAGS_http_port, http_listener);
LOG(INFO) << "Started http service on port " << port;
}
dfly::RunEngine(&pp, &acceptor, http_listener);
pp.Stop();
return 0;
}

View file

@ -0,0 +1,110 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/dragonfly_connection.h"
#include <boost/fiber/operations.hpp>
#include "base/io_buf.h"
#include "base/logging.h"
#include "server/main_service.h"
#include "util/fiber_sched_algo.h"
using namespace util;
using namespace std;
namespace this_fiber = boost::this_fiber;
namespace fibers = boost::fibers;
namespace dfly {
namespace {
constexpr size_t kMinReadSize = 256;
} // namespace
struct Connection::Shutdown {
absl::flat_hash_map<ShutdownHandle, ShutdownCb> map;
ShutdownHandle next_handle = 1;
ShutdownHandle Add(ShutdownCb cb) {
map[next_handle] = move(cb);
return next_handle++;
}
void Remove(ShutdownHandle sh) {
map.erase(sh);
}
};
Connection::Connection(Service* service)
: service_(service) {
}
Connection::~Connection() {
}
void Connection::OnShutdown() {
VLOG(1) << "Connection::OnShutdown";
if (shutdown_) {
for (const auto& k_v : shutdown_->map) {
k_v.second();
}
}
}
auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle {
if (!shutdown_) {
shutdown_ = make_unique<Shutdown>();
}
return shutdown_->Add(std::move(cb));
}
void Connection::UnregisterShutdownHook(ShutdownHandle id) {
if (shutdown_) {
shutdown_->Remove(id);
if (shutdown_->map.empty())
shutdown_.reset();
}
}
void Connection::HandleRequests() {
this_fiber::properties<FiberProps>().set_name("DflyConnection");
int val = 1;
CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
FiberSocketBase* peer = socket_.get();
InputLoop(peer);
VLOG(1) << "Closed connection for peer " << socket_->RemoteEndpoint();
}
void Connection::InputLoop(FiberSocketBase* peer) {
base::IoBuf io_buf{kMinReadSize};
std::error_code ec;
do {
auto buf = io_buf.AppendBuffer();
::io::Result<size_t> recv_sz = peer->Recv(buf);
if (!recv_sz) {
ec = recv_sz.error();
break;
}
io_buf.CommitWrite(*recv_sz);
ec = peer->Write(io_buf.InputBuffer());
if (ec)
break;
} while (peer->IsOpen());
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
LOG(WARNING) << "Socket error " << ec;
}
}
} // namespace dfly

View file

@ -0,0 +1,40 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include "util/connection.h"
namespace dfly {
class Service;
class Connection : public util::Connection {
public:
Connection(Service* service);
~Connection();
using error_code = std::error_code;
using ShutdownCb = std::function<void()>;
using ShutdownHandle = unsigned;
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);
void UnregisterShutdownHook(ShutdownHandle id);
protected:
void OnShutdown() override;
private:
void HandleRequests() final;
void InputLoop(util::FiberSocketBase* peer);
void DispatchFiber(util::FiberSocketBase* peer);
Service* service_;
struct Shutdown;
std::unique_ptr<Shutdown> shutdown_;
};
} // namespace dfly

View file

@ -0,0 +1,47 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/dragonfly_listener.h"
#include "base/logging.h"
#include "server/dragonfly_connection.h"
#include "util/proactor_pool.h"
using namespace util;
DEFINE_uint32(conn_threads, 0, "Number of threads used for handing server connections");
namespace dfly {
Listener::Listener(Service* e) : engine_(e) {
}
Listener::~Listener() {
}
util::Connection* Listener::NewConnection(ProactorBase* proactor) {
return new Connection{engine_};
}
void Listener::PreShutdown() {
}
void Listener::PostShutdown() {
}
// We can limit number of threads handling dragonfly connections.
ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) {
uint32_t id = next_id_.fetch_add(1, std::memory_order_relaxed);
uint32_t total = FLAGS_conn_threads;
util::ProactorPool* pp = pool();
if (total == 0 || total > pp->size()) {
total = pp->size();
}
return pp->at(id % total);
}
} // namespace dfly

View file

@ -0,0 +1,31 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include "util/listener_interface.h"
namespace dfly {
class Service;
class Listener : public util::ListenerInterface {
public:
Listener(Service*);
~Listener();
private:
util::Connection* NewConnection(util::ProactorBase* proactor) final;
util::ProactorBase* PickConnectionProactor(util::LinuxSocketBase* sock) final;
void PreShutdown();
void PostShutdown();
Service* engine_;
std::atomic_uint32_t next_id_{0};
};
} // namespace dfly

64
server/main_service.cc Normal file
View file

@ -0,0 +1,64 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#include "server/main_service.h"
#include <boost/fiber/operations.hpp>
#include <filesystem>
#include "base/logging.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"
DEFINE_uint32(port, 6380, "Redis port");
namespace dfly {
using namespace std;
using namespace util;
using base::VarzValue;
namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
namespace {
DEFINE_VARZ(VarzMapAverage, request_latency_usec);
std::optional<VarzFunction> engine_varz;
} // namespace
Service::Service(ProactorPool* pp)
: pp_(*pp) {
CHECK(pp);
engine_varz.emplace("engine", [this] { return GetVarzStats(); });
}
Service::~Service() {
engine_varz.reset();
}
void Service::Init(util::AcceptServer* acceptor) {
request_latency_usec.Init(&pp_);
}
void Service::Shutdown() {
request_latency_usec.Shutdown();
}
void Service::RegisterHttp(HttpListenerBase* listener) {
CHECK_NOTNULL(listener);
}
VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;
res.emplace_back("keys", VarzValue::FromInt(0));
return res;
}
} // namespace dfly

40
server/main_service.h Normal file
View file

@ -0,0 +1,40 @@
// Copyright 2021, Beeri 15. All rights reserved.
// Author: Roman Gershman (romange@gmail.com)
//
#pragma once
#include "base/varz_value.h"
#include "util/http/http_handler.h"
namespace util {
class AcceptServer;
} // namespace util
namespace dfly {
class Service {
public:
using error_code = std::error_code;
explicit Service(util::ProactorPool* pp);
~Service();
void RegisterHttp(util::HttpListenerBase* listener);
void Init(util::AcceptServer* acceptor);
void Shutdown();
util::ProactorPool& proactor_pool() {
return pp_;
}
private:
base::VarzValue::Map GetVarzStats();
util::ProactorPool& pp_;
};
} // namespace dfly