1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat: stabilize non-coordinated omission mode (#3407)

* feat: stabilize non-coordinated omission mode

1. Our latency/RPS computations were off because we started measuring before drivers
   started running. Now, Run/Start phases are separated, so the start time is measured more precisely
   (after the start phase)
2. Introduced progress per connection - one of my discoveries is that driver
   connections progress with differrent pace when running in coordinated omission mode.
   This can reach x5 speed differrences. Now we measure and output fastest/slowest progress.
3. Coordinated omission is great when the Server Under Test is able to sustain the required RPS.
   But if the actual RPS is lower than the one is sent than the final latencies will be infinitely high.
   We fix it by introducing self-adjusting sleep interval, so if the actual RPS is lower
   we will increase the interval to be closer to the actual RPS.

Show p99 latency and maximum pending requests per connection.

Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
Signed-off-by: Roman Gershman <romange@gmail.com>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Signed-off-by: Roman Gershman <romange@gmail.com>
Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
Roman Gershman 2024-07-30 11:55:43 +03:00 committed by GitHub
parent 89a48a7aa8
commit e464990643
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 181 additions and 63 deletions

View file

@ -1,13 +1,16 @@
add_library(dfly_parser_lib redis_parser.cc resp_expr.cc )
cxx_link(dfly_parser_lib base strings_lib)
add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc service_interface.cc memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc
reply_capture.cc resp_expr.cc cmd_arg_parser.cc tls_error.cc) reply_capture.cc cmd_arg_parser.cc tls_error.cc)
if (DF_USE_SSL) if (DF_USE_SSL)
set(TLS_LIB tls_lib) set(TLS_LIB tls_lib)
target_compile_definitions(dfly_facade PRIVATE DFLY_USE_SSL) target_compile_definitions(dfly_facade PRIVATE DFLY_USE_SSL)
endif() endif()
cxx_link(dfly_facade base strings_lib http_server_lib fibers2 cxx_link(dfly_facade dfly_parser_lib http_server_lib fibers2
${TLS_LIB} TRDP::mimalloc TRDP::dconv) ${TLS_LIB} TRDP::mimalloc TRDP::dconv)
add_library(facade_test facade_test.cc) add_library(facade_test facade_test.cc)

View file

@ -18,7 +18,7 @@ if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
tiering/external_alloc.cc) tiering/external_alloc.cc)
add_executable(dfly_bench dfly_bench.cc) add_executable(dfly_bench dfly_bench.cc)
cxx_link(dfly_bench dfly_facade fibers2 absl::random_random) cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random)
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY) cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY) cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY) cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY)

View file

@ -41,13 +41,16 @@ ABSL_FLAG(uint64_t, seed, 42, "A seed for random data generation");
ABSL_FLAG(uint64_t, key_stddev, 0, ABSL_FLAG(uint64_t, key_stddev, 0,
"Standard deviation for non-uniform distribution, 0 chooses" "Standard deviation for non-uniform distribution, 0 chooses"
" a default value of (max-min)/6"); " a default value of (max-min)/6");
ABSL_FLAG(uint32_t, pipeline, 1, "maximum number of pending requests per connection");
ABSL_FLAG(string, ratio, "1:10", "Set:Get ratio"); ABSL_FLAG(string, ratio, "1:10", "Set:Get ratio");
ABSL_FLAG(string, command, "", "custom command with __key__ placeholder for keys"); ABSL_FLAG(string, command, "", "custom command with __key__ placeholder for keys");
ABSL_FLAG(string, P, "", "protocol can be empty (for RESP) or memcache_text"); ABSL_FLAG(string, P, "", "protocol can be empty (for RESP) or memcache_text");
ABSL_FLAG(bool, tcp_nodelay, false, "If true, set nodelay option on tcp socket");
using namespace std; using namespace std;
using namespace util; using namespace util;
using absl::GetFlag; using absl::GetFlag;
using absl::StrFormat;
using facade::RedisParser; using facade::RedisParser;
using facade::RespVec; using facade::RespVec;
using tcp = ::boost::asio::ip::tcp; using tcp = ::boost::asio::ip::tcp;
@ -162,6 +165,7 @@ struct ClientStats {
uint64_t hit_count = 0; uint64_t hit_count = 0;
uint64_t hit_opportunities = 0; uint64_t hit_opportunities = 0;
uint64_t num_errors = 0; uint64_t num_errors = 0;
unsigned num_clients = 0;
ClientStats& operator+=(const ClientStats& o) { ClientStats& operator+=(const ClientStats& o) {
hist.Merge(o.hist); hist.Merge(o.hist);
@ -169,6 +173,7 @@ struct ClientStats {
hit_count += o.hit_count; hit_count += o.hit_count;
hit_opportunities += o.hit_opportunities; hit_opportunities += o.hit_opportunities;
num_errors += o.num_errors; num_errors += o.num_errors;
num_clients += o.num_clients;
return *this; return *this;
} }
}; };
@ -186,7 +191,15 @@ class Driver {
Driver& operator=(Driver&&) = default; Driver& operator=(Driver&&) = default;
void Connect(unsigned index, const tcp::endpoint& ep); void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint64_t cycle_ns, CommandGenerator* cmd_gen); void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
float done() const {
return double(received_) / num_reqs_;
}
unsigned pending_length() const {
return reqs_.size();
}
private: private:
void PopRequest(); void PopRequest();
@ -198,9 +211,11 @@ class Driver {
bool might_hit; bool might_hit;
}; };
uint32_t num_reqs_; uint32_t num_reqs_, received_ = 0;
ClientStats& stats_; ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_; unique_ptr<FiberSocketBase> socket_;
fb2::Fiber receive_fb_;
queue<Req> reqs_; queue<Req> reqs_;
fb2::CondVarAny cnd_; fb2::CondVarAny cnd_;
}; };
@ -218,13 +233,50 @@ class TLocalClient {
TLocalClient(const TLocalClient&) = delete; TLocalClient(const TLocalClient&) = delete;
void Connect(tcp::endpoint ep); void Connect(tcp::endpoint ep);
void Run(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns); void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
void Join();
ClientStats stats; ClientStats stats;
tuple<float, float> GetMinMaxDone() const {
float min = 1, max = 0;
for (unsigned i = 0; i < drivers_.size(); ++i) {
float done = drivers_[i]->done();
max = std::max(done, max);
min = std::min(done, min);
}
return {min, max};
}
unsigned MaxPending() const {
unsigned max = 0;
for (unsigned i = 0; i < drivers_.size(); ++i) {
if (drivers_[i]->pending_length() > max) {
max = drivers_[i]->pending_length();
}
}
return max;
}
unsigned num_conns() const {
return drivers_.size();
}
void AdjustCycle();
private: private:
ProactorBase* p_; ProactorBase* p_;
vector<unique_ptr<Driver>> drivers_; vector<unique_ptr<Driver>> drivers_;
optional<KeyGenerator> key_gen_;
optional<CommandGenerator> cmd_gen_;
vector<fb2::Fiber> driver_fbs_;
uint64_t cur_cycle_ns_;
uint64_t target_cycle_;
int64_t start_time_;
}; };
KeyGenerator::KeyGenerator(uint32_t min, uint32_t max) KeyGenerator::KeyGenerator(uint32_t min, uint32_t max)
@ -276,32 +328,44 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
VLOG(2) << "Connecting " << index; VLOG(2) << "Connecting " << index;
error_code ec = socket_->Connect(ep); error_code ec = socket_->Connect(ep);
CHECK(!ec) << "Could not connect to " << ep << " " << ec; CHECK(!ec) << "Could not connect to " << ep << " " << ec;
if (GetFlag(FLAGS_tcp_nodelay)) {
int yes = 1;
CHECK_EQ(0, setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
}
receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); });
} }
void Driver::Run(uint64_t cycle_ns, CommandGenerator* cmd_gen) { void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
auto receive_fb = MakeFiber([this] { ReceiveFb(); }); const int64_t start = absl::GetCurrentTimeNanos();
unsigned pipeline = GetFlag(FLAGS_pipeline);
int64_t next_invocation = absl::GetCurrentTimeNanos(); stats_.num_clients++;
const absl::Time start = absl::Now();
for (unsigned i = 0; i < num_reqs_; ++i) { for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos(); int64_t now = absl::GetCurrentTimeNanos();
if (cycle_ns) { if (cycle_ns) {
int64_t sleep_ns = next_invocation - now; int64_t target_ts = start + i * (*cycle_ns);
int64_t sleep_ns = target_ts - now;
if (reqs_.size() > 10 && sleep_ns <= 0) {
sleep_ns = 10'000;
}
if (sleep_ns > 0) { if (sleep_ns > 0) {
VLOG(5) << "Sleeping for " << sleep_ns << "ns"; VLOG(5) << "Sleeping for " << sleep_ns << "ns";
ThisFiber::SleepFor(chrono::nanoseconds(sleep_ns)); // There is no point in sending more requests if they are piled up in the server.
} else { do {
ThisFiber::SleepFor(chrono::nanoseconds(sleep_ns));
} while (reqs_.size() > 10);
} else if (i % 256 == 255) {
ThisFiber::Yield();
VLOG(5) << "Behind QPS schedule"; VLOG(5) << "Behind QPS schedule";
} }
next_invocation += cycle_ns;
} else { } else {
// Coordinated omission. // Coordinated omission.
fb2::NoOpLock lk; fb2::NoOpLock lk;
cnd_.wait(lk, [this] { return reqs_.empty(); }); cnd_.wait(lk, [this, pipeline] { return reqs_.size() < pipeline; });
} }
string cmd = cmd_gen->Next(); string cmd = cmd_gen->Next();
@ -320,8 +384,9 @@ void Driver::Run(uint64_t cycle_ns, CommandGenerator* cmd_gen) {
CHECK(!ec) << ec.message(); CHECK(!ec) << ec.message();
} }
const absl::Time finish = absl::Now(); const int finish = absl::GetCurrentTimeNanos();
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took " << finish - start VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took "
<< StrFormat("%.1fs", double(finish - start) / 1000000000)
<< ". Waiting for server processing"; << ". Waiting for server processing";
// TODO: to change to a condvar or something. // TODO: to change to a condvar or something.
@ -330,8 +395,9 @@ void Driver::Run(uint64_t cycle_ns, CommandGenerator* cmd_gen) {
} }
socket_->Shutdown(SHUT_RDWR); // breaks the receive fiber. socket_->Shutdown(SHUT_RDWR); // breaks the receive fiber.
receive_fb.Join(); receive_fb_.Join();
std::ignore = socket_->Close(); std::ignore = socket_->Close();
stats_.num_clients--;
} }
static string_view FindLine(io::Bytes buf) { static string_view FindLine(io::Bytes buf) {
@ -350,7 +416,7 @@ void Driver::PopRequest() {
uint64_t usec = (now - reqs_.front().start) / 1000; uint64_t usec = (now - reqs_.front().start) / 1000;
stats_.hist.Add(usec); stats_.hist.Add(usec);
stats_.hit_opportunities += reqs_.front().might_hit; stats_.hit_opportunities += reqs_.front().might_hit;
++received_;
reqs_.pop(); reqs_.pop();
if (reqs_.empty()) { if (reqs_.empty()) {
cnd_.notify_one(); cnd_.notify_one();
@ -452,27 +518,58 @@ void TLocalClient::Connect(tcp::endpoint ep) {
fb.Join(); fb.Join();
} }
void TLocalClient::Run(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) { void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) {
KeyGenerator key_gen(key_min, key_max); key_gen_.emplace(key_min, key_max);
CommandGenerator cmd_gen(&key_gen); cmd_gen_.emplace(&key_gen_.value());
vector<fb2::Fiber> fbs(drivers_.size()); driver_fbs_.resize(drivers_.size());
for (size_t i = 0; i < fbs.size(); ++i) {
fbs[i] = fb2::Fiber(absl::StrCat("run/", i), [&, i] { drivers_[i]->Run(cycle_ns, &cmd_gen); }); cur_cycle_ns_ = cycle_ns;
target_cycle_ = cycle_ns;
start_time_ = absl::GetCurrentTimeNanos();
for (size_t i = 0; i < driver_fbs_.size(); ++i) {
driver_fbs_[i] = fb2::Fiber(absl::StrCat("run/", i), [&, i] {
drivers_[i]->Run(cur_cycle_ns_ ? &cur_cycle_ns_ : nullptr, &cmd_gen_.value());
});
} }
}
for (auto& fb : fbs) void TLocalClient::Join() {
for (auto& fb : driver_fbs_)
fb.Join(); fb.Join();
VLOG(1) << "Total hits: " << stats.hit_count; VLOG(1) << "Total hits: " << stats.hit_count;
} }
void TLocalClient::AdjustCycle() {
if (cur_cycle_ns_ == 0 || stats.num_responses == 0)
return;
// We adjust sleeping cycle per thread, and it's the same for all connection in this thread.
// We compute the aggregated cycle so far based on responses, and if it
// is greater than current we increase the current cycle. Otherwise,
// we try slowly reducing the cycle back to the nominal one.
int64_t running_time = absl::GetCurrentTimeNanos() - start_time_;
int64_t real_cycle = running_time * drivers_.size() / stats.num_responses;
if (real_cycle > cur_cycle_ns_ * 1.05) {
cur_cycle_ns_ = (cur_cycle_ns_ + real_cycle) / 2;
VLOG(1) << "Increasing cycle to " << cur_cycle_ns_;
} else if (cur_cycle_ns_ > target_cycle_) {
cur_cycle_ns_ -= (cur_cycle_ns_ - target_cycle_) * 0.2;
}
}
thread_local unique_ptr<TLocalClient> client; thread_local unique_ptr<TLocalClient> client;
void WatchFiber(absl::Time start_time, atomic_bool* finish_signal, ProactorPool* pp) { void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
fb2::Mutex mutex; fb2::Mutex mutex;
absl::Time last_print; // initialized to epoch time. int64_t start_time = absl::GetCurrentTimeNanos();
LOG(INFO) << "Started watching";
int64_t last_print = start_time;
uint64_t num_last_resp_cnt = 0; uint64_t num_last_resp_cnt = 0;
uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n); uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n);
@ -481,33 +578,48 @@ void WatchFiber(absl::Time start_time, atomic_bool* finish_signal, ProactorPool*
// we sleep with resolution of 1s but print with lower frequency to be more responsive // we sleep with resolution of 1s but print with lower frequency to be more responsive
// when benchmark finishes. // when benchmark finishes.
ThisFiber::SleepFor(1s); ThisFiber::SleepFor(1s);
absl::Time now = absl::Now(); pp->AwaitBrief([](auto, auto*) { client->AdjustCycle(); });
if (now - last_print > absl::Seconds(5)) {
ClientStats client_stats;
pp->AwaitFiberOnAll([&](auto* p) {
unique_lock lk(mutex);
client_stats += client->stats;
lk.unlock();
});
uint64_t total_ms = (now - start_time) / absl::Milliseconds(1); int64_t now = absl::GetCurrentTimeNanos();
uint64_t period_ms = (now - last_print) / absl::Milliseconds(1); if (now - last_print < 5000'000'000LL) // 5s
uint64_t period_resp_cnt = client_stats.num_responses - num_last_resp_cnt; continue;
double done_perc = double(client_stats.num_responses) * 100 / resp_goal;
double hitrate =
client_stats.hit_opportunities > 0
? 100 * double(client_stats.hit_count) / double(client_stats.hit_opportunities)
: 0;
CONSOLE_INFO << total_ms / 1000 << "s: " << absl::StrFormat("%.1f", done_perc)
<< "% done, effective RPS(now/accumulated): "
<< period_resp_cnt * 1000 / period_ms << "/"
<< client_stats.num_responses * 1000 / total_ms
<< ", errors: " << client_stats.num_errors
<< ", hitrate: " << absl::StrFormat("%.1f", hitrate) << "%";
last_print = now; ClientStats stats;
num_last_resp_cnt = client_stats.num_responses; float done_max = 0;
} float done_min = 1;
unsigned max_pending = 0;
pp->AwaitFiberOnAll([&](auto* p) {
auto [mind, maxd] = client->GetMinMaxDone();
unsigned max_pend = client->MaxPending();
unique_lock lk(mutex);
stats += client->stats;
done_max = max(done_max, maxd);
done_min = min(done_min, mind);
max_pending = max(max_pending, max_pend);
});
uint64_t total_ms = (now - start_time) / 1'000'000;
uint64_t period_ms = (now - last_print) / 1'000'000;
uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt;
double done_perc = double(stats.num_responses) * 100 / resp_goal;
double hitrate = stats.hit_opportunities > 0
? 100 * double(stats.hit_count) / double(stats.hit_opportunities)
: 0;
unsigned latency = stats.hist.Percentile(99);
CONSOLE_INFO << total_ms / 1000 << "s: " << StrFormat("%.1f", done_perc)
<< "% done, RPS(now/agg): " << period_resp_cnt * 1000 / period_ms << "/"
<< stats.num_responses * 1000 / total_ms << ", errs: " << stats.num_errors
<< ", hitrate: " << StrFormat("%.1f%%", hitrate)
<< ", clients: " << stats.num_clients << "\n"
<< "done_min: " << StrFormat("%.2f%%", done_min * 100)
<< ", done_max: " << StrFormat("%.2f%%", done_max * 100)
<< ", p99_lat(us): " << latency << ", max_pending: " << max_pending;
last_print = now;
num_last_resp_cnt = stats.num_responses;
} }
} }
@ -590,23 +702,26 @@ int main(int argc, char* argv[]) {
} else { } else {
CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server"; CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server";
} }
const absl::Time start_time = absl::Now();
atomic_bool finish{false};
auto watch_fb =
pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(start_time, &finish, pp.get()); });
pp->AwaitFiberOnAll([&](unsigned index, auto* p) { atomic_bool finish{false};
pp->AwaitBrief([&](unsigned index, auto* p) {
uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size()) uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size())
? key_minimum + (index + 1) * thread_key_step - 1 ? key_minimum + (index + 1) * thread_key_step - 1
: key_maximum; : key_maximum;
client->Run(key_minimum + index * thread_key_step, key_max, interval); client->Start(key_minimum + index * thread_key_step, key_max, interval);
}); });
auto watch_fb = pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(&finish, pp.get()); });
const absl::Time start_time = absl::Now();
// The actual run.
pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Join(); });
absl::Duration duration = absl::Now() - start_time; absl::Duration duration = absl::Now() - start_time;
finish.store(true); finish.store(true);
watch_fb.Join(); watch_fb.Join();
fb2::Mutex mutex; fb2::Mutex mutex;
base::Histogram hist;
LOG(INFO) << "Resetting all threads"; LOG(INFO) << "Resetting all threads";
@ -626,7 +741,7 @@ int main(int argc, char* argv[]) {
CONSOLE_INFO << "Got " << summary.num_errors << " error responses!"; CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";
} }
CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString(); CONSOLE_INFO << "Latency summary, all times are in usec:\n" << summary.hist.ToString();
if (summary.hit_opportunities) { if (summary.hit_opportunities) {
CONSOLE_INFO << "----------------------------------\nHit rate: " CONSOLE_INFO << "----------------------------------\nHit rate: "
<< 100 * double(summary.hit_count) / double(summary.hit_opportunities) << "%\n"; << 100 * double(summary.hit_count) / double(summary.hit_opportunities) << "%\n";