diff --git a/src/core/dash.h b/src/core/dash.h index 40da7ee70..d99b3a400 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -179,6 +179,15 @@ class DashTable : public detail::DashTableBase { return segment_[segment_id]; } + size_t GetSegmentCount() const { + return segment_.size(); + } + + size_t NextSeg(size_t sid) const { + size_t delta = (1u << (global_depth_ - segment_[sid]->local_depth())); + return sid + delta; + } + template uint64_t DoHash(const U& k) const { return policy_.HashFn(k); } @@ -281,11 +290,6 @@ class DashTable : public detail::DashTableBase { // the same object. IterateDistinct goes over all distinct segments in the table. template void IterateDistinct(Cb&& cb); - size_t NextSeg(size_t sid) const { - size_t delta = (1u << (global_depth_ - segment_[sid]->local_depth())); - return sid + delta; - } - auto EqPred() const { return [p = &policy_](const auto& a, const auto& b) -> bool { return p->Equal(a, b); }; } @@ -326,14 +330,18 @@ class DashTable<_Key, _Value, Policy>::Iterator { template ::type* = nullptr> Iterator(const Iterator& other) noexcept - : owner_(other.owner_), seg_id_(other.seg_id_), bucket_id_(other.bucket_id_), + : owner_(other.owner_), + seg_id_(other.seg_id_), + bucket_id_(other.bucket_id_), slot_id_(other.slot_id_) { } // Copy constructor from iterator to bucket_iterator and vice versa. template Iterator(const Iterator& other) noexcept - : owner_(other.owner_), seg_id_(other.seg_id_), bucket_id_(other.bucket_id_), + : owner_(other.owner_), + seg_id_(other.seg_id_), + bucket_id_(other.bucket_id_), slot_id_(IsSingleBucket ? 0 : other.slot_id_) { // if this - is a bucket_iterator - we reset slot_id to the first occupied space. if constexpr (IsSingleBucket) { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 05523a46b..7131ef2b9 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -8,6 +8,7 @@ extern "C" { #include "redis/object.h" } +#include "base/flags.h" #include "base/logging.h" #include "generic_family.h" #include "server/engine_shard_set.h" @@ -15,10 +16,22 @@ extern "C" { #include "server/server_state.h" #include "server/tiered_storage.h" +ABSL_FLAG(bool, enable_heartbeat_eviction, true, + "Enable eviction during heartbeat when memory is under pressure."); + +ABSL_FLAG(uint32_t, max_eviction_per_heartbeat, 100, + "The maximum number of key-value pairs that will be deleted in each eviction " + "when heartbeat based eviction is triggered under memory pressure."); + +ABSL_FLAG(uint32_t, max_segment_to_consider, 4, + "The maximum number of dashtable segments to scan in each eviction " + "when heartbeat based eviction is triggered under memory pressure."); + namespace dfly { using namespace std; using namespace util; +using absl::GetFlag; using facade::OpStatus; namespace { @@ -982,7 +995,7 @@ pair DbSlice::ExpireIfNeeded(const Context& cntx, string_view tmp_key; // Replicate expiry - if (auto journal = EngineShard::tlocal()->journal(); journal) { + if (auto journal = owner_->journal(); journal) { tmp_key = it->first.GetSlice(&tmp_key_buf); RecordExpiry(cntx.db_index, tmp_key); } @@ -1096,10 +1109,92 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx return result; } -// TODO: Design a better background evicting heuristic. +int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const { + // wraps around if we reached the end + return db_arr_[db_ind]->prime.NextSeg((size_t)segment_id) % + db_arr_[db_ind]->prime.GetSegmentCount(); +} + void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes) { - if (!caching_mode_) + DCHECK(!owner_->IsReplica()); + if ((!caching_mode_) || !expire_allowed_ || !GetFlag(FLAGS_enable_heartbeat_eviction)) return; + + auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat); + auto max_segment_to_consider = GetFlag(FLAGS_max_segment_to_consider); + + auto time_start = absl::GetCurrentTimeNanos(); + auto& db_table = db_arr_[db_ind]; + int32_t num_segments = db_table->prime.GetSegmentCount(); + int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets; + int32_t num_slots = PrimeTable::Segment_t::kNumSlots; + + size_t used_memory_after; + size_t evicted = 0; + string tmp; + int32_t starting_segment_id = rand() % num_segments; + size_t used_memory_before = owner_->UsedMemory(); + vector keys_to_journal; + + { + FiberAtomicGuard guard; + for (int32_t slot_id = num_slots - 1; slot_id >= 0; --slot_id) { + for (int32_t bucket_id = num_buckets - 1; bucket_id >= 0; --bucket_id) { + // pick a random segment to start with in each eviction, + // as segment_id does not imply any recency, and random selection should be fair enough + int32_t segment_id = starting_segment_id; + for (size_t num_seg_visited = 0; num_seg_visited < max_segment_to_consider; + ++num_seg_visited, segment_id = GetNextSegmentForEviction(segment_id, db_ind)) { + const auto& bucket = db_table->prime.GetSegment(segment_id)->GetBucket(bucket_id); + if (bucket.IsEmpty()) + continue; + + if (!bucket.IsBusy(slot_id)) + continue; + + auto evict_it = db_table->prime.GetIterator(segment_id, bucket_id, slot_id); + if (evict_it->first.IsSticky()) + continue; + + // check if the key is locked by looking up transaction table. + auto& lt = db_table->trans_locks; + string_view key = evict_it->first.GetSlice(&tmp); + if (lt.find(KeyLockArgs::GetLockKey(key)) != lt.end()) + continue; + + if (auto journal = owner_->journal(); journal) { + keys_to_journal.push_back(tmp); + } + + PerformDeletion(evict_it, shard_owner(), db_table.get()); + ++evicted; + + used_memory_after = owner_->UsedMemory(); + // returns when whichever condition is met first + if ((evicted == max_eviction_per_hb) || + (used_memory_before - used_memory_after >= increase_goal_bytes)) + goto finish; + } + } + } + } + +finish: + // send the deletion to the replicas. + // fiber preemption could happen in this phase. + vector args(keys_to_journal.begin(), keys_to_journal.end()); + ArgSlice delete_args(&args[0], args.size()); + if (auto journal = owner_->journal(); journal) { + journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, make_pair("DEL", delete_args), false); + } + + auto time_finish = absl::GetCurrentTimeNanos(); + events_.evicted_keys += evicted; + DVLOG(2) << "Memory usage before eviction: " << used_memory_before; + DVLOG(2) << "Memory usage after eviction: " << used_memory_after; + DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted << "/" + << max_eviction_per_hb; + DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000; } void DbSlice::CreateDb(DbIndex db_ind) { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e5bcc4152..f828e057b 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -304,6 +304,8 @@ class DbSlice { DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count); void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes); + int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const; + const DbTableArray& databases() const { return db_arr_; } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ef506e3a2..f54e4e1fd 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -32,7 +32,6 @@ ABSL_FLAG(uint32_t, hz, 100, ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " "by evicting entries when getting close to maxmemory limit"); - // memory defragmented related flags ABSL_FLAG(float, mem_defrag_threshold, 0.7, "Minimum percentage of used memory relative to maxmemory cap before running " diff --git a/src/server/main_service.cc b/src/server/main_service.cc index a0ae8bb83..259b8c2a9 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -695,6 +695,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("masterauth"); config_registry.RegisterMutable("tcp_keepalive"); config_registry.RegisterMutable("replica_partial_sync"); + config_registry.RegisterMutable("max_eviction_per_heartbeat"); + config_registry.RegisterMutable("max_segment_to_consider"); acl::UserRegistry* reg = &user_registry_; pp_.Await([reg](uint32_t index, ProactorBase* pb) { ServerState::Init(index, reg); }); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 642096119..458a7fd28 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1488,7 +1488,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("listpack_blobs", total.listpack_blob_cnt); append("listpack_bytes", total.listpack_bytes); append("small_string_bytes", m.small_string_bytes); - append("pipeline_cache_bytes", m.conn_stats.pipeline_cmd_cache_bytes); append("dispatch_queue_bytes", m.conn_stats.dispatch_queue_bytes); append("dispatch_queue_peak_bytes", m.peak_stats.conn_dispatch_queue_bytes); diff --git a/tools/eviction/fill_db.py b/tools/eviction/fill_db.py new file mode 100755 index 000000000..5e93e7d53 --- /dev/null +++ b/tools/eviction/fill_db.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 + +""" +This script implements facilities for assessing cache eviction. +Two major functions have been implemented that allow users to + 1. Populate Dragonfly with a specified key and value length distributions. + 2. Measuring the cache hit rate of Dragonfly with workloads that access keys using Zipfian distribution. + +Usage: +To perform database population, simply run: + +./fill_db.py -f + +This will automatically populate the database to the point where about 2X of maxmemory (specified by Dragonfly) +of KV pairs will be inserted. By default, we always stop at 2X maxmemory, and this can be changed using the -r +option, for instance + +./fill_db.py -f -r 0.25 # population stops at 4x maxmemory + +To accelerate the population, we can use multiple processes running this script in parallel. A convenient script +has been provided in this directory: +./run_fill_db.sh 10 # use 10 processes to fill in parallel + +After database has been populated, we can start measuring cache hit rate using the -m option: +./fill_db.py -m +Note that the measurement must be done after the population as this mode relies on reading back the complete key +space inserted during the population phase. By default, we perform 100000 set operations for calculating cache hit rate. +This number can be changed using the -c option: +./fill_db.py -m -c 2000 +""" + + +import redis +import string +from random import choice +from random import shuffle +import numpy as np + +import asyncio +from redis import asyncio as aioredis +import os +import argparse +import re +import glob + +all_val_lens = [400, 800, 1600, 25000] +val_lens_probs = [0.003, 0.78, 0.2, 0.017] + +all_key_lens = [35, 60, 70] +key_lens_probs = [0.2, 0.06, 0.74] + +inserted_keys = [] + + +def random_str(len): + return "".join( + choice(string.ascii_letters + string.digits + string.punctuation) for i in range(len) + ) + + +def random_key(): + global all_key_lens, key_lens_probs + return random_str(np.random.choice(all_key_lens, p=key_lens_probs)) + + +def random_val(): + global all_val_lens, val_lens_probs + return random_str(np.random.choice(all_val_lens, p=val_lens_probs)) + + +def flush_keys_to_file(file_name): + global inserted_keys + with open(file_name, "a") as f: + for key in inserted_keys: + f.write(f"{key}\n") + + +def read_keys_from_file(file_name): + global inserted_keys + with open(file_name) as file: + for line in file: + inserted_keys.append(line.rstrip()) + + +def read_keys(): + global inserted_keys + inserted_keys.clear() + key_files = glob.glob("./keys_*.txt") + for key_file in key_files: + read_keys_from_file(key_file) + + +def sync_populate_db(): + r = redis.Redis(decode_responses=True) + n = 0 + while True: + r.set(random_key(), random_val()) + n += 1 + if n % 1000 == 0: + print("\r>> Number of key-value pairs inserted: {}".format(n), end="") + + +def sync_query_db(): + global inserted_keys + r = redis.Redis(decode_responses=True) + n = 0 + read_keys() + misses = 0 + hits = 0 + for key in inserted_keys: + resp = r.set(key, random_val(), nx=True) + # print(resp) + if resp: + misses += 1 + else: + hits += 1 + n += 1 + if n % 1000 == 0: + print( + "\r>> Number of key-value pairs inserted: {0}, hit: {1}, miss: {2}".format( + n, hits, misses + ), + end="", + ) + + +async def populate_db(ratio): + global inserted_keys + r = aioredis.Redis(decode_responses=True) + n = 0 + misses = 0 + hits = 0 + + total_key_count = 0 + while True: + # await r.set(random_key(), random_val()) + pipeline = r.pipeline(False) + for x in range(200): + k = random_key() + inserted_keys.append(k) + pipeline.set(k, random_val()) + # pipeline.set(k, random_val(), nx=True) + await pipeline.execute() + # responses = await pipeline.execute() + # for resp in responses: + # if resp: + # misses += 1 + # else: + # hits += 1 + + # key file names are in keys_xxxx.txt format + key_file_name = "keys_" + str(os.getpid()) + ".txt" + flush_keys_to_file(key_file_name) + inserted_keys.clear() + n += 200 + + if total_key_count == 0: + db_info = await r.info() + used_mem = float(db_info["used_memory"]) + max_mem = float(db_info["maxmemory"]) + redline = 0.9 + # we will know the total number of keys of the whole space + # only when we approach the maxmemory of the db + if used_mem >= max_mem * redline: + total_key_count = int(float(n) / ratio) + print( + "\n>> Determined target key count: {0}, current key count: {1}, ratio: {2}".format( + total_key_count, n, ratio + ), + end="", + ) + else: + if n >= total_key_count: + print("\n>> Target number of keys reached: {}, stopping...".format(n), end="") + break + if n % 1000 == 0: + print("\r>> Number of key-value pairs inserted: {0}".format(n), end="") + # print("\r>> Number of key-value pairs inserted: {0}, hit: {1}, miss: {2}".format(n, hits, misses), end='') + + +def rand_zipf_generator(alpha: float, upper: int, batch: int): + """ + n: The upper bound of the values to generate a zipfian distribution over + (n = 30 would generate a distribution of given alpha from values 1 to 30) + alpha: The alpha parameter to be used while creating the Zipfian distribution + num_samples: The total number of samples to generate over the Zipfian distribution + This is a generator that yields up to count values using a generator. + """ + + # Calculate Zeta values from 1 to n: + tmp = np.power(np.arange(1, upper + 1), -alpha) + zeta = np.r_[0.0, np.cumsum(tmp)] + + # Store the translation map: + distMap = [x / zeta[-1] for x in zeta] + + while True: + # Generate an array of uniform 0-1 pseudo-random values: + u = np.random.random(batch) + + # bisect them with distMap + v = np.searchsorted(distMap, u) + + samples = [t - 1 for t in v] + yield samples + + +def rearrange_keys(): + """ + This function potentially provides the capability for testing different caching workloads. + for instance, if we rearrange all the keys via sorting based on the k-v memory usage, + we will generate a zipfian hotspot that prefers to access small kv pairs (or larger kv pairs) + current implementation just uses a random shuffle. + """ + global inserted_keys + shuffle(inserted_keys) + + +async def query_db_with_locality(count): + global inserted_keys + r = aioredis.Redis(decode_responses=True) + n = 0 + read_keys() + rearrange_keys() + misses = 0 + hits = 0 + pipeline_size = 200 + key_index_gen = rand_zipf_generator(1.0, len(inserted_keys), pipeline_size) + for key_indices in key_index_gen: + pipeline = r.pipeline(False) + # print(key_indices) + for key_index in key_indices: + k = inserted_keys[key_index] + pipeline.set(k, random_val(), nx=True) + + responses = await pipeline.execute() + n += pipeline_size + for resp in responses: + if resp: + misses += 1 + else: + hits += 1 + print( + "\r>> Number of ops: {0}, hit: {1}, miss: {2}, hit rate: {3:.4f}".format( + n, hits, misses, float(hits) / float(hits + misses) + ), + end="", + ) + if n >= count: + break + hit_rate = float(hits) / float(hits + misses) + print("\n>> Cache hit rate: {:.4f}".format(hit_rate)) + + +class Range(object): + def __init__(self, start, end): + self.start = start + self.end = end + + def __eq__(self, other): + return self.start <= other <= self.end + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Cache Benchmark", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "-f", + "--fill", + action="store_true", + help="fill database with random key-value pairs with their lengths follow some distributions", + ) + + parser.add_argument( + "-r", + "--ratio", + type=float, + default=0.5, + choices=[Range(0.0, 1.0)], + help="the ratio between in memory data size and total data size", + ) + + parser.add_argument( + "-m", + "--measure", + action="store_true", + help="measure cache hit rate by visiting the entire key space with a Zipfian distribution", + ) + + parser.add_argument( + "-c", + "--count", + type=int, + default=100000, + help="total number of operations to be performed when measuring cache hit rate", + ) + + args = parser.parse_args() + + if args.fill: + asyncio.run(populate_db(args.ratio)) + exit(0) + + if args.measure: + asyncio.run(query_db_with_locality(args.count)) diff --git a/tools/eviction/run_fill_db.sh b/tools/eviction/run_fill_db.sh new file mode 100755 index 000000000..4ccf0db3f --- /dev/null +++ b/tools/eviction/run_fill_db.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +rm ./keys_*.txt +for i in `seq 1 $1` +do + echo "launching process $i to fill.." + ./fill_db.py -f & +done + +wait diff --git a/tools/eviction/stop_fill_db.sh b/tools/eviction/stop_fill_db.sh new file mode 100755 index 000000000..cbda5c832 --- /dev/null +++ b/tools/eviction/stop_fill_db.sh @@ -0,0 +1,2 @@ +#!/bin/sh +ps -ef | grep fill_db.py | grep -v grep | awk '{print $2}' | xargs kill -9