1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

chore: implement sequential pass whithout the overlapping traffic (#3335)

We divide the keyspace between connections in advance.
This allows easily cover chunks of a key space in a predictable manner without having overlapping traffic.
Excess traffic will just wrap around.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-18 06:02:02 -04:00 committed by GitHub
parent c670ffd09e
commit 37b992f27d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -72,6 +72,7 @@ class KeyGenerator {
private:
string prefix_;
uint64_t min_, max_, range_;
uint64_t seq_cursor_;
double stddev_ = 1.0 / 6;
optional<base::ZipfianGenerator> zipf_;
};
@ -176,7 +177,7 @@ class Driver {
Driver& operator=(Driver&&) = default;
void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint64_t cycle_ns);
void Run(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
private:
void PopRequest();
@ -208,7 +209,7 @@ class TLocalClient {
TLocalClient(const TLocalClient&) = delete;
void Connect(tcp::endpoint ep);
void Run(uint64_t cycle_ns);
void Run(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
ClientStats stats;
@ -222,6 +223,7 @@ KeyGenerator::KeyGenerator(uint32_t min, uint32_t max)
prefix_ = GetFlag(FLAGS_key_prefix);
CHECK_GT(range_, 0u);
seq_cursor_ = min_;
switch (dist_type) {
case NORMAL: {
uint64_t stddev = GetFlag(FLAGS_key_stddev);
@ -252,7 +254,9 @@ string KeyGenerator::operator()() {
key_suffix = zipf_->Next(bit_gen);
break;
case SEQUENTIAL:
key_suffix = min_++;
key_suffix = seq_cursor_++;
if (seq_cursor_ > max_)
seq_cursor_ = min_;
break;
}
@ -265,17 +269,14 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
}
void Driver::Run(uint64_t cycle_ns) {
void Driver::Run(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) {
auto receive_fb = MakeFiber([this] { ReceiveFb(); });
int64_t next_invocation = absl::GetCurrentTimeNanos();
const absl::Time start = absl::Now();
const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);
const uint32_t key_maximum = GetFlag(FLAGS_key_maximum);
KeyGenerator key_gen(key_minimum, key_maximum);
KeyGenerator key_gen(key_min, key_max);
CommandGenerator cmd_gen(&key_gen);
for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos();
@ -442,10 +443,16 @@ void TLocalClient::Connect(tcp::endpoint ep) {
fb.Join();
}
void TLocalClient::Run(uint64_t cycle_ns) {
void TLocalClient::Run(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) {
uint32_t quanta =
dist_type == SEQUENTIAL ? std::max(1UL, (key_max - key_min + 1) / drivers_.size()) : 0;
vector<fb2::Fiber> fbs(drivers_.size());
for (size_t i = 0; i < fbs.size(); ++i) {
fbs[i] = fb2::Fiber(absl::StrCat("run/", i), [&, i] { drivers_[i]->Run(cycle_ns); });
uint32 from = key_min + quanta * i;
uint32 to = quanta > 0 && i + 1 < fbs.size() ? key_min + quanta * (i + 1) - 1 : key_max;
fbs[i] = fb2::Fiber(absl::StrCat("run/", i),
[&, i, from, to] { drivers_[i]->Run(from, to, cycle_ns); });
}
for (auto& fb : fbs)
@ -539,16 +546,33 @@ int main(int argc, char* argv[]) {
client->Connect(ep);
});
const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);
const uint32_t key_maximum = GetFlag(FLAGS_key_maximum);
CHECK_LE(key_minimum, key_maximum);
uint32_t thread_key_step = 0;
const uint32_t qps = GetFlag(FLAGS_qps);
const int64_t interval = qps ? 1000000000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n);
uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size();
uint64_t total_requests = num_reqs * total_conn_num;
if (dist_type == SEQUENTIAL) {
thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size());
if (total_requests > (key_maximum - key_minimum)) {
CONSOLE_INFO << "Warning: only " << key_maximum - key_minimum
<< " unique entries will be accessed with " << total_requests
<< " total requests";
}
}
CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection";
<< " requests per each connection, or " << total_requests << " requests overall";
if (interval) {
CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps)
<< " rps per connection, i.e. request every " << interval / 1000 << "us";
CONSOLE_INFO << "Overall scheduled RPS: " << qps * pp->size() * GetFlag(FLAGS_c);
CONSOLE_INFO << "Overall scheduled RPS: " << qps * total_conn_num;
} else {
CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server";
}
@ -557,7 +581,12 @@ int main(int argc, char* argv[]) {
auto watch_fb =
pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(start_time, &finish, pp.get()); });
pp->AwaitFiberOnAll([&](auto* p) { client->Run(interval); });
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size())
? key_minimum + (index + 1) * thread_key_step - 1
: key_maximum;
client->Run(key_minimum + index * thread_key_step, key_max, interval);
});
absl::Duration duration = absl::Now() - start_time;
finish.store(true);
watch_fb.Join();