mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix(bug): crash when starting replica while saving (#2618)
The bug: crash when starting replica while saving The problem: accessing the wrong allocator on snapshot class destruction as it was destructed not in the thread of the shard The fix: call snapshot destructor when we finish snapshot on the correct thread Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
5f3e9a88c9
commit
1ef8795611
5 changed files with 38 additions and 2 deletions
|
@ -338,9 +338,11 @@ void SaveStagesController::CloseCb(unsigned index) {
|
||||||
if (auto& snapshot = snapshots_[index].first; snapshot) {
|
if (auto& snapshot = snapshots_[index].first; snapshot) {
|
||||||
shared_err_ = snapshot->Close();
|
shared_err_ = snapshot->Close();
|
||||||
|
|
||||||
lock_guard lk{rdb_name_map_mu_};
|
unique_lock lk{rdb_name_map_mu_};
|
||||||
for (const auto& k_v : snapshot->freq_map())
|
for (const auto& k_v : snapshot->freq_map())
|
||||||
rdb_name_map_[RdbTypeName(k_v.first)] += k_v.second;
|
rdb_name_map_[RdbTypeName(k_v.first)] += k_v.second;
|
||||||
|
lk.unlock();
|
||||||
|
snapshot.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (auto* es = EngineShard::tlocal(); use_dfs_format_ && es)
|
if (auto* es = EngineShard::tlocal(); use_dfs_format_ && es)
|
||||||
|
|
|
@ -49,7 +49,7 @@ class RdbSnapshot {
|
||||||
error_code Close();
|
error_code Close();
|
||||||
size_t GetSaveBuffersSize();
|
size_t GetSaveBuffersSize();
|
||||||
|
|
||||||
const RdbTypeFreqMap freq_map() const {
|
const RdbTypeFreqMap& freq_map() const {
|
||||||
return freq_map_;
|
return freq_map_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
|
|
||||||
#include "base/flags.h"
|
#include "base/flags.h"
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
|
#include "server/server_state.h"
|
||||||
|
|
||||||
ABSL_FLAG(bool, enable_top_keys_tracking, false,
|
ABSL_FLAG(bool, enable_top_keys_tracking, false,
|
||||||
"Enables / disables tracking of hot keys debugging feature");
|
"Enables / disables tracking of hot keys debugging feature");
|
||||||
|
@ -103,9 +104,11 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
|
||||||
if (ClusterConfig::IsEnabled()) {
|
if (ClusterConfig::IsEnabled()) {
|
||||||
slots_stats.resize(ClusterConfig::kMaxSlotNum + 1);
|
slots_stats.resize(ClusterConfig::kMaxSlotNum + 1);
|
||||||
}
|
}
|
||||||
|
thread_index = ServerState::tlocal()->thread_index();
|
||||||
}
|
}
|
||||||
|
|
||||||
DbTable::~DbTable() {
|
DbTable::~DbTable() {
|
||||||
|
DCHECK_EQ(thread_index, ServerState::tlocal()->thread_index());
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbTable::Clear() {
|
void DbTable::Clear() {
|
||||||
|
|
|
@ -139,6 +139,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
|
||||||
|
|
||||||
TopKeys top_keys;
|
TopKeys top_keys;
|
||||||
DbIndex index;
|
DbIndex index;
|
||||||
|
uint32_t thread_index;
|
||||||
|
|
||||||
explicit DbTable(PMR_NS::memory_resource* mr, DbIndex index);
|
explicit DbTable(PMR_NS::memory_resource* mr, DbIndex index);
|
||||||
~DbTable();
|
~DbTable();
|
||||||
|
|
|
@ -2031,3 +2031,33 @@ async def test_saving_replica(df_local_factory):
|
||||||
assert not await is_saving()
|
assert not await is_saving()
|
||||||
|
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
await disconnect_clients(c_master, *[c_replica])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_start_replicating_while_save(df_local_factory):
|
||||||
|
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
|
||||||
|
|
||||||
|
master = df_local_factory.create(proactor_threads=4)
|
||||||
|
replica = df_local_factory.create(proactor_threads=4, dbfilename=f"dump_{tmp_file_name}")
|
||||||
|
df_local_factory.start_all([master, replica])
|
||||||
|
|
||||||
|
c_master = master.client()
|
||||||
|
c_replica = replica.client()
|
||||||
|
|
||||||
|
await c_replica.execute_command("DEBUG POPULATE 1000 key 4096 RAND")
|
||||||
|
|
||||||
|
async def save_replica():
|
||||||
|
await c_replica.execute_command("save")
|
||||||
|
|
||||||
|
async def is_saving():
|
||||||
|
return "saving:1" in (await c_replica.execute_command("INFO PERSISTENCE"))
|
||||||
|
|
||||||
|
save_task = asyncio.create_task(save_replica())
|
||||||
|
while not await is_saving(): # wait for server start saving
|
||||||
|
asyncio.sleep(0.1)
|
||||||
|
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||||
|
assert await is_saving()
|
||||||
|
await save_task
|
||||||
|
assert not await is_saving()
|
||||||
|
|
||||||
|
await disconnect_clients(c_master, *[c_replica])
|
||||||
|
|
Loading…
Reference in a new issue