From 00f1e3d57837b5e5dfd141ab1bc823129679aa55 Mon Sep 17 00:00:00 2001 From: Yue Li <61070669+theyueli@users.noreply.github.com> Date: Wed, 1 Nov 2023 11:11:27 -0700 Subject: [PATCH] feat(server): perform eviction upon memory pressure in cache mode (#2084) * fixes #1936 Eviction Implementation This patch provides a very simple eviction implementation for the interface mentioned above. In my opinion, the eviction algorithm approximates an LRU policy given that normal buckets always store the most recently accessed data while stash buckets are holding less active data. The algorithm first selects a small set of segments as eviction targets. Starting from the last slot of the last stash bucket in each of the segments, we walk backward to evict key-value pairs stored in each visited slot. The eviction stopped either when a target memory release goal or the max number of evicted key-value pairs is reached. Therefore, we can upper bound the eviction time through the following two parameters that can be set when DF starts. Note that these two parameters could be retrieved and changed by user through CONFIG GET and CONFIG SET commands. --------- Signed-off-by: Yue Li <61070669+theyueli@users.noreply.github.com> --- src/core/dash.h | 22 ++- src/server/db_slice.cc | 101 ++++++++++- src/server/db_slice.h | 2 + src/server/engine_shard_set.cc | 1 - src/server/main_service.cc | 2 + src/server/server_family.cc | 1 - tools/eviction/fill_db.py | 307 +++++++++++++++++++++++++++++++++ tools/eviction/run_fill_db.sh | 10 ++ tools/eviction/stop_fill_db.sh | 2 + 9 files changed, 436 insertions(+), 12 deletions(-) create mode 100755 tools/eviction/fill_db.py create mode 100755 tools/eviction/run_fill_db.sh create mode 100755 tools/eviction/stop_fill_db.sh diff --git a/src/core/dash.h b/src/core/dash.h index 40da7ee70..d99b3a400 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -179,6 +179,15 @@ class DashTable : public detail::DashTableBase { return segment_[segment_id]; } + size_t GetSegmentCount() const { + return segment_.size(); + } + + size_t NextSeg(size_t sid) const { + size_t delta = (1u << (global_depth_ - segment_[sid]->local_depth())); + return sid + delta; + } + template uint64_t DoHash(const U& k) const { return policy_.HashFn(k); } @@ -281,11 +290,6 @@ class DashTable : public detail::DashTableBase { // the same object. IterateDistinct goes over all distinct segments in the table. template void IterateDistinct(Cb&& cb); - size_t NextSeg(size_t sid) const { - size_t delta = (1u << (global_depth_ - segment_[sid]->local_depth())); - return sid + delta; - } - auto EqPred() const { return [p = &policy_](const auto& a, const auto& b) -> bool { return p->Equal(a, b); }; } @@ -326,14 +330,18 @@ class DashTable<_Key, _Value, Policy>::Iterator { template ::type* = nullptr> Iterator(const Iterator& other) noexcept - : owner_(other.owner_), seg_id_(other.seg_id_), bucket_id_(other.bucket_id_), + : owner_(other.owner_), + seg_id_(other.seg_id_), + bucket_id_(other.bucket_id_), slot_id_(other.slot_id_) { } // Copy constructor from iterator to bucket_iterator and vice versa. template Iterator(const Iterator& other) noexcept - : owner_(other.owner_), seg_id_(other.seg_id_), bucket_id_(other.bucket_id_), + : owner_(other.owner_), + seg_id_(other.seg_id_), + bucket_id_(other.bucket_id_), slot_id_(IsSingleBucket ? 0 : other.slot_id_) { // if this - is a bucket_iterator - we reset slot_id to the first occupied space. if constexpr (IsSingleBucket) { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 05523a46b..7131ef2b9 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -8,6 +8,7 @@ extern "C" { #include "redis/object.h" } +#include "base/flags.h" #include "base/logging.h" #include "generic_family.h" #include "server/engine_shard_set.h" @@ -15,10 +16,22 @@ extern "C" { #include "server/server_state.h" #include "server/tiered_storage.h" +ABSL_FLAG(bool, enable_heartbeat_eviction, true, + "Enable eviction during heartbeat when memory is under pressure."); + +ABSL_FLAG(uint32_t, max_eviction_per_heartbeat, 100, + "The maximum number of key-value pairs that will be deleted in each eviction " + "when heartbeat based eviction is triggered under memory pressure."); + +ABSL_FLAG(uint32_t, max_segment_to_consider, 4, + "The maximum number of dashtable segments to scan in each eviction " + "when heartbeat based eviction is triggered under memory pressure."); + namespace dfly { using namespace std; using namespace util; +using absl::GetFlag; using facade::OpStatus; namespace { @@ -982,7 +995,7 @@ pair DbSlice::ExpireIfNeeded(const Context& cntx, string_view tmp_key; // Replicate expiry - if (auto journal = EngineShard::tlocal()->journal(); journal) { + if (auto journal = owner_->journal(); journal) { tmp_key = it->first.GetSlice(&tmp_key_buf); RecordExpiry(cntx.db_index, tmp_key); } @@ -1096,10 +1109,92 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx return result; } -// TODO: Design a better background evicting heuristic. +int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const { + // wraps around if we reached the end + return db_arr_[db_ind]->prime.NextSeg((size_t)segment_id) % + db_arr_[db_ind]->prime.GetSegmentCount(); +} + void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes) { - if (!caching_mode_) + DCHECK(!owner_->IsReplica()); + if ((!caching_mode_) || !expire_allowed_ || !GetFlag(FLAGS_enable_heartbeat_eviction)) return; + + auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat); + auto max_segment_to_consider = GetFlag(FLAGS_max_segment_to_consider); + + auto time_start = absl::GetCurrentTimeNanos(); + auto& db_table = db_arr_[db_ind]; + int32_t num_segments = db_table->prime.GetSegmentCount(); + int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets; + int32_t num_slots = PrimeTable::Segment_t::kNumSlots; + + size_t used_memory_after; + size_t evicted = 0; + string tmp; + int32_t starting_segment_id = rand() % num_segments; + size_t used_memory_before = owner_->UsedMemory(); + vector keys_to_journal; + + { + FiberAtomicGuard guard; + for (int32_t slot_id = num_slots - 1; slot_id >= 0; --slot_id) { + for (int32_t bucket_id = num_buckets - 1; bucket_id >= 0; --bucket_id) { + // pick a random segment to start with in each eviction, + // as segment_id does not imply any recency, and random selection should be fair enough + int32_t segment_id = starting_segment_id; + for (size_t num_seg_visited = 0; num_seg_visited < max_segment_to_consider; + ++num_seg_visited, segment_id = GetNextSegmentForEviction(segment_id, db_ind)) { + const auto& bucket = db_table->prime.GetSegment(segment_id)->GetBucket(bucket_id); + if (bucket.IsEmpty()) + continue; + + if (!bucket.IsBusy(slot_id)) + continue; + + auto evict_it = db_table->prime.GetIterator(segment_id, bucket_id, slot_id); + if (evict_it->first.IsSticky()) + continue; + + // check if the key is locked by looking up transaction table. + auto& lt = db_table->trans_locks; + string_view key = evict_it->first.GetSlice(&tmp); + if (lt.find(KeyLockArgs::GetLockKey(key)) != lt.end()) + continue; + + if (auto journal = owner_->journal(); journal) { + keys_to_journal.push_back(tmp); + } + + PerformDeletion(evict_it, shard_owner(), db_table.get()); + ++evicted; + + used_memory_after = owner_->UsedMemory(); + // returns when whichever condition is met first + if ((evicted == max_eviction_per_hb) || + (used_memory_before - used_memory_after >= increase_goal_bytes)) + goto finish; + } + } + } + } + +finish: + // send the deletion to the replicas. + // fiber preemption could happen in this phase. + vector args(keys_to_journal.begin(), keys_to_journal.end()); + ArgSlice delete_args(&args[0], args.size()); + if (auto journal = owner_->journal(); journal) { + journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, make_pair("DEL", delete_args), false); + } + + auto time_finish = absl::GetCurrentTimeNanos(); + events_.evicted_keys += evicted; + DVLOG(2) << "Memory usage before eviction: " << used_memory_before; + DVLOG(2) << "Memory usage after eviction: " << used_memory_after; + DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted << "/" + << max_eviction_per_hb; + DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000; } void DbSlice::CreateDb(DbIndex db_ind) { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e5bcc4152..f828e057b 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -304,6 +304,8 @@ class DbSlice { DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count); void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes); + int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const; + const DbTableArray& databases() const { return db_arr_; } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ef506e3a2..f54e4e1fd 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -32,7 +32,6 @@ ABSL_FLAG(uint32_t, hz, 100, ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " "by evicting entries when getting close to maxmemory limit"); - // memory defragmented related flags ABSL_FLAG(float, mem_defrag_threshold, 0.7, "Minimum percentage of used memory relative to maxmemory cap before running " diff --git a/src/server/main_service.cc b/src/server/main_service.cc index a0ae8bb83..259b8c2a9 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -695,6 +695,8 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("masterauth"); config_registry.RegisterMutable("tcp_keepalive"); config_registry.RegisterMutable("replica_partial_sync"); + config_registry.RegisterMutable("max_eviction_per_heartbeat"); + config_registry.RegisterMutable("max_segment_to_consider"); acl::UserRegistry* reg = &user_registry_; pp_.Await([reg](uint32_t index, ProactorBase* pb) { ServerState::Init(index, reg); }); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 642096119..458a7fd28 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1488,7 +1488,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("listpack_blobs", total.listpack_blob_cnt); append("listpack_bytes", total.listpack_bytes); append("small_string_bytes", m.small_string_bytes); - append("pipeline_cache_bytes", m.conn_stats.pipeline_cmd_cache_bytes); append("dispatch_queue_bytes", m.conn_stats.dispatch_queue_bytes); append("dispatch_queue_peak_bytes", m.peak_stats.conn_dispatch_queue_bytes); diff --git a/tools/eviction/fill_db.py b/tools/eviction/fill_db.py new file mode 100755 index 000000000..5e93e7d53 --- /dev/null +++ b/tools/eviction/fill_db.py @@ -0,0 +1,307 @@ +#!/usr/bin/env python3 + +""" +This script implements facilities for assessing cache eviction. +Two major functions have been implemented that allow users to + 1. Populate Dragonfly with a specified key and value length distributions. + 2. Measuring the cache hit rate of Dragonfly with workloads that access keys using Zipfian distribution. + +Usage: +To perform database population, simply run: + +./fill_db.py -f + +This will automatically populate the database to the point where about 2X of maxmemory (specified by Dragonfly) +of KV pairs will be inserted. By default, we always stop at 2X maxmemory, and this can be changed using the -r +option, for instance + +./fill_db.py -f -r 0.25 # population stops at 4x maxmemory + +To accelerate the population, we can use multiple processes running this script in parallel. A convenient script +has been provided in this directory: +./run_fill_db.sh 10 # use 10 processes to fill in parallel + +After database has been populated, we can start measuring cache hit rate using the -m option: +./fill_db.py -m +Note that the measurement must be done after the population as this mode relies on reading back the complete key +space inserted during the population phase. By default, we perform 100000 set operations for calculating cache hit rate. +This number can be changed using the -c option: +./fill_db.py -m -c 2000 +""" + + +import redis +import string +from random import choice +from random import shuffle +import numpy as np + +import asyncio +from redis import asyncio as aioredis +import os +import argparse +import re +import glob + +all_val_lens = [400, 800, 1600, 25000] +val_lens_probs = [0.003, 0.78, 0.2, 0.017] + +all_key_lens = [35, 60, 70] +key_lens_probs = [0.2, 0.06, 0.74] + +inserted_keys = [] + + +def random_str(len): + return "".join( + choice(string.ascii_letters + string.digits + string.punctuation) for i in range(len) + ) + + +def random_key(): + global all_key_lens, key_lens_probs + return random_str(np.random.choice(all_key_lens, p=key_lens_probs)) + + +def random_val(): + global all_val_lens, val_lens_probs + return random_str(np.random.choice(all_val_lens, p=val_lens_probs)) + + +def flush_keys_to_file(file_name): + global inserted_keys + with open(file_name, "a") as f: + for key in inserted_keys: + f.write(f"{key}\n") + + +def read_keys_from_file(file_name): + global inserted_keys + with open(file_name) as file: + for line in file: + inserted_keys.append(line.rstrip()) + + +def read_keys(): + global inserted_keys + inserted_keys.clear() + key_files = glob.glob("./keys_*.txt") + for key_file in key_files: + read_keys_from_file(key_file) + + +def sync_populate_db(): + r = redis.Redis(decode_responses=True) + n = 0 + while True: + r.set(random_key(), random_val()) + n += 1 + if n % 1000 == 0: + print("\r>> Number of key-value pairs inserted: {}".format(n), end="") + + +def sync_query_db(): + global inserted_keys + r = redis.Redis(decode_responses=True) + n = 0 + read_keys() + misses = 0 + hits = 0 + for key in inserted_keys: + resp = r.set(key, random_val(), nx=True) + # print(resp) + if resp: + misses += 1 + else: + hits += 1 + n += 1 + if n % 1000 == 0: + print( + "\r>> Number of key-value pairs inserted: {0}, hit: {1}, miss: {2}".format( + n, hits, misses + ), + end="", + ) + + +async def populate_db(ratio): + global inserted_keys + r = aioredis.Redis(decode_responses=True) + n = 0 + misses = 0 + hits = 0 + + total_key_count = 0 + while True: + # await r.set(random_key(), random_val()) + pipeline = r.pipeline(False) + for x in range(200): + k = random_key() + inserted_keys.append(k) + pipeline.set(k, random_val()) + # pipeline.set(k, random_val(), nx=True) + await pipeline.execute() + # responses = await pipeline.execute() + # for resp in responses: + # if resp: + # misses += 1 + # else: + # hits += 1 + + # key file names are in keys_xxxx.txt format + key_file_name = "keys_" + str(os.getpid()) + ".txt" + flush_keys_to_file(key_file_name) + inserted_keys.clear() + n += 200 + + if total_key_count == 0: + db_info = await r.info() + used_mem = float(db_info["used_memory"]) + max_mem = float(db_info["maxmemory"]) + redline = 0.9 + # we will know the total number of keys of the whole space + # only when we approach the maxmemory of the db + if used_mem >= max_mem * redline: + total_key_count = int(float(n) / ratio) + print( + "\n>> Determined target key count: {0}, current key count: {1}, ratio: {2}".format( + total_key_count, n, ratio + ), + end="", + ) + else: + if n >= total_key_count: + print("\n>> Target number of keys reached: {}, stopping...".format(n), end="") + break + if n % 1000 == 0: + print("\r>> Number of key-value pairs inserted: {0}".format(n), end="") + # print("\r>> Number of key-value pairs inserted: {0}, hit: {1}, miss: {2}".format(n, hits, misses), end='') + + +def rand_zipf_generator(alpha: float, upper: int, batch: int): + """ + n: The upper bound of the values to generate a zipfian distribution over + (n = 30 would generate a distribution of given alpha from values 1 to 30) + alpha: The alpha parameter to be used while creating the Zipfian distribution + num_samples: The total number of samples to generate over the Zipfian distribution + This is a generator that yields up to count values using a generator. + """ + + # Calculate Zeta values from 1 to n: + tmp = np.power(np.arange(1, upper + 1), -alpha) + zeta = np.r_[0.0, np.cumsum(tmp)] + + # Store the translation map: + distMap = [x / zeta[-1] for x in zeta] + + while True: + # Generate an array of uniform 0-1 pseudo-random values: + u = np.random.random(batch) + + # bisect them with distMap + v = np.searchsorted(distMap, u) + + samples = [t - 1 for t in v] + yield samples + + +def rearrange_keys(): + """ + This function potentially provides the capability for testing different caching workloads. + for instance, if we rearrange all the keys via sorting based on the k-v memory usage, + we will generate a zipfian hotspot that prefers to access small kv pairs (or larger kv pairs) + current implementation just uses a random shuffle. + """ + global inserted_keys + shuffle(inserted_keys) + + +async def query_db_with_locality(count): + global inserted_keys + r = aioredis.Redis(decode_responses=True) + n = 0 + read_keys() + rearrange_keys() + misses = 0 + hits = 0 + pipeline_size = 200 + key_index_gen = rand_zipf_generator(1.0, len(inserted_keys), pipeline_size) + for key_indices in key_index_gen: + pipeline = r.pipeline(False) + # print(key_indices) + for key_index in key_indices: + k = inserted_keys[key_index] + pipeline.set(k, random_val(), nx=True) + + responses = await pipeline.execute() + n += pipeline_size + for resp in responses: + if resp: + misses += 1 + else: + hits += 1 + print( + "\r>> Number of ops: {0}, hit: {1}, miss: {2}, hit rate: {3:.4f}".format( + n, hits, misses, float(hits) / float(hits + misses) + ), + end="", + ) + if n >= count: + break + hit_rate = float(hits) / float(hits + misses) + print("\n>> Cache hit rate: {:.4f}".format(hit_rate)) + + +class Range(object): + def __init__(self, start, end): + self.start = start + self.end = end + + def __eq__(self, other): + return self.start <= other <= self.end + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Cache Benchmark", formatter_class=argparse.ArgumentDefaultsHelpFormatter + ) + + parser.add_argument( + "-f", + "--fill", + action="store_true", + help="fill database with random key-value pairs with their lengths follow some distributions", + ) + + parser.add_argument( + "-r", + "--ratio", + type=float, + default=0.5, + choices=[Range(0.0, 1.0)], + help="the ratio between in memory data size and total data size", + ) + + parser.add_argument( + "-m", + "--measure", + action="store_true", + help="measure cache hit rate by visiting the entire key space with a Zipfian distribution", + ) + + parser.add_argument( + "-c", + "--count", + type=int, + default=100000, + help="total number of operations to be performed when measuring cache hit rate", + ) + + args = parser.parse_args() + + if args.fill: + asyncio.run(populate_db(args.ratio)) + exit(0) + + if args.measure: + asyncio.run(query_db_with_locality(args.count)) diff --git a/tools/eviction/run_fill_db.sh b/tools/eviction/run_fill_db.sh new file mode 100755 index 000000000..4ccf0db3f --- /dev/null +++ b/tools/eviction/run_fill_db.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +rm ./keys_*.txt +for i in `seq 1 $1` +do + echo "launching process $i to fill.." + ./fill_db.py -f & +done + +wait diff --git a/tools/eviction/stop_fill_db.sh b/tools/eviction/stop_fill_db.sh new file mode 100755 index 000000000..cbda5c832 --- /dev/null +++ b/tools/eviction/stop_fill_db.sh @@ -0,0 +1,2 @@ +#!/bin/sh +ps -ef | grep fill_db.py | grep -v grep | awk '{print $2}' | xargs kill -9