mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: ub in RegisterOnChange and regression tests for big values (#3336)
* fix replication test flag name for big values * fix a bug that triggers ub when RegisterOnChange is called on flows that iterate over the callbacks and preempt * add a stress test for big value serialization Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
cad62679a4
commit
8a2d6ad1f4
3 changed files with 45 additions and 4 deletions
|
@ -17,6 +17,7 @@
|
|||
#include "server/server_state.h"
|
||||
#include "server/tiered_storage.h"
|
||||
#include "strings/human_readable.h"
|
||||
#include "util/fibers/fibers.h"
|
||||
#include "util/fibers/stacktrace.h"
|
||||
|
||||
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
|
||||
|
@ -1114,11 +1115,13 @@ void DbSlice::ExpireAllIfNeeded() {
|
|||
}
|
||||
|
||||
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
||||
block_counter_.Wait();
|
||||
return change_cb_.emplace_back(NextVersion(), std::move(cb)).first;
|
||||
}
|
||||
|
||||
void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
|
||||
FetchedItemsRestorer fetched_restorer(&fetched_items_);
|
||||
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
|
||||
|
||||
uint64_t bucket_version = it.GetVersion();
|
||||
// change_cb_ is ordered by version.
|
||||
|
@ -1139,6 +1142,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
|
|||
|
||||
//! Unregisters the callback.
|
||||
void DbSlice::UnregisterOnChange(uint64_t id) {
|
||||
block_counter_.Wait();
|
||||
auto it = find_if(change_cb_.begin(), change_cb_.end(),
|
||||
[id](const auto& cb) { return cb.first == id; });
|
||||
CHECK(it != change_cb_.end());
|
||||
|
@ -1543,7 +1547,10 @@ void DbSlice::OnCbFinish() {
|
|||
|
||||
void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
|
||||
FetchedItemsRestorer fetched_restorer(&fetched_items_);
|
||||
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
|
||||
|
||||
for (const auto& ccb : change_cb_) {
|
||||
CHECK(ccb.second);
|
||||
ccb.second(id, cr);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
#include "server/conn_context.h"
|
||||
#include "server/table.h"
|
||||
#include "util/fibers/fibers.h"
|
||||
#include "util/fibers/synchronization.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -524,6 +525,36 @@ class DbSlice {
|
|||
void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;
|
||||
|
||||
private:
|
||||
class LocalBlockingCounter {
|
||||
public:
|
||||
void lock() {
|
||||
++mutating;
|
||||
}
|
||||
|
||||
void unlock() {
|
||||
--mutating;
|
||||
if (mutating == 0) {
|
||||
cond_var.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
void Wait() {
|
||||
util::fb2::NoOpLock noop_lk_;
|
||||
cond_var.wait(noop_lk_, [this]() { return mutating == 0; });
|
||||
}
|
||||
|
||||
private:
|
||||
util::fb2::CondVarAny cond_var;
|
||||
size_t mutating = 0;
|
||||
};
|
||||
|
||||
// We need this because registered callbacks might yield. If RegisterOnChange
|
||||
// gets called after we preempt while iterating over the registered callbacks
|
||||
// (let's say in FlushChangeToEarlierCallbacks) we will get UB, because we pushed
|
||||
// into a vector which might get resized, invalidating the iterators that are being
|
||||
// used by the preempted FlushChangeToEarlierCallbacks. LocalBlockingCounter
|
||||
// protects us against this case.
|
||||
mutable LocalBlockingCounter block_counter_;
|
||||
ShardId shard_id_;
|
||||
uint8_t caching_mode_ : 1;
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ Test full replication pipeline. Test full sync with streaming changes and stable
|
|||
pytest.param(
|
||||
8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, False, marks=M_STRESS
|
||||
),
|
||||
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, True, marks=M_STRESS),
|
||||
],
|
||||
)
|
||||
@pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})])
|
||||
|
@ -67,14 +68,16 @@ async def test_replication_all(
|
|||
big_value,
|
||||
mode,
|
||||
):
|
||||
args = {}
|
||||
if mode:
|
||||
mode["maxmemory"] = str(t_master * 256) + "mb"
|
||||
args["cache_mode"] = "true"
|
||||
args["maxmemory"] = str(t_master * 256) + "mb"
|
||||
|
||||
if big_value:
|
||||
mode["compression_mode"] = 0
|
||||
mode["flush_big_entries_threshold"] = 4096
|
||||
args["compression_mode"] = 0
|
||||
args["serialization_max_chunk_size"] = 4096
|
||||
|
||||
master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **mode)
|
||||
master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **args)
|
||||
replicas = [
|
||||
df_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t)
|
||||
for i, t in enumerate(t_replicas)
|
||||
|
|
Loading…
Reference in a new issue