1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(server): saving is not a server state (#2613)

* fix(server): saving is not a server state

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-02-19 17:20:48 +02:00 committed by GitHub
parent 58dda3bc4b
commit 15b3fb13b6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 145 additions and 91 deletions

View file

@ -75,8 +75,6 @@ const char* GlobalStateName(GlobalState s) {
return "ACTIVE"; return "ACTIVE";
case GlobalState::LOADING: case GlobalState::LOADING:
return "LOADING"; return "LOADING";
case GlobalState::SAVING:
return "SAVING";
case GlobalState::SHUTTING_DOWN: case GlobalState::SHUTTING_DOWN:
return "SHUTTING DOWN"; return "SHUTTING DOWN";
case GlobalState::TAKEN_OVER: case GlobalState::TAKEN_OVER:

View file

@ -154,7 +154,6 @@ struct SearchStats {
enum class GlobalState : uint8_t { enum class GlobalState : uint8_t {
ACTIVE, ACTIVE,
LOADING, LOADING,
SAVING,
SHUTTING_DOWN, SHUTTING_DOWN,
TAKEN_OVER, TAKEN_OVER,
}; };

View file

@ -9,6 +9,7 @@
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "server/detail/snapshot_storage.h"
#include "server/main_service.h" #include "server/main_service.h"
#include "server/script_mgr.h" #include "server/script_mgr.h"
#include "server/transaction.h" #include "server/transaction.h"
@ -147,40 +148,26 @@ SaveStagesController::SaveStagesController(SaveStagesInputs&& inputs)
SaveStagesController::~SaveStagesController() { SaveStagesController::~SaveStagesController() {
} }
GenericError SaveStagesController::Save() { SaveInfo SaveStagesController::Save() {
if (auto err = BuildFullPath(); err) if (auto err = BuildFullPath(); err) {
return err; shared_err_ = err;
return GetSaveInfo();
}
if (auto err = InitResources(); err) InitResources();
return err;
// The stages below report errors to shared_err_
if (use_dfs_format_) if (use_dfs_format_)
SaveDfs(); SaveDfs();
else else
SaveRdb(); SaveRdb();
is_saving_->store(true, memory_order_relaxed);
{
lock_guard lk{*save_mu_};
*save_bytes_cb_ = [this]() { return GetSaveBuffersSize(); };
}
RunStage(&SaveStagesController::SaveCb); RunStage(&SaveStagesController::SaveCb);
{
lock_guard lk{*save_mu_};
*save_bytes_cb_ = nullptr;
}
is_saving_->store(false, memory_order_relaxed);
RunStage(&SaveStagesController::CloseCb); RunStage(&SaveStagesController::CloseCb);
FinalizeFileMovement(); FinalizeFileMovement();
UpdateSaveInfo(); return GetSaveInfo();
return *shared_err_;
} }
size_t SaveStagesController::GetSaveBuffersSize() { size_t SaveStagesController::GetSaveBuffersSize() {
@ -264,14 +251,18 @@ void SaveStagesController::SaveRdb() {
trans_->ScheduleSingleHop(std::move(cb)); trans_->ScheduleSingleHop(std::move(cb));
} }
void SaveStagesController::UpdateSaveInfo() { uint32_t SaveStagesController::GetCurrentSaveDuration() {
auto seconds = (absl::Now() - start_time_) / absl::Seconds(1); return (absl::Now() - start_time_) / absl::Seconds(1);
}
SaveInfo SaveStagesController::GetSaveInfo() {
SaveInfo info;
info.save_time = absl::ToUnixSeconds(start_time_);
info.duration_sec = GetCurrentSaveDuration();
if (shared_err_) { if (shared_err_) {
lock_guard lk{*save_mu_}; info.error = *shared_err_;
last_save_info_->last_error = *shared_err_; return info;
last_save_info_->last_error_time = absl::ToUnixSeconds(start_time_);
last_save_info_->failed_duration_sec = seconds;
return;
} }
fs::path resulting_path = full_path_; fs::path resulting_path = full_path_;
@ -281,23 +272,22 @@ void SaveStagesController::UpdateSaveInfo() {
resulting_path.replace_extension(); // remove .tmp resulting_path.replace_extension(); // remove .tmp
LOG(INFO) << "Saving " << resulting_path << " finished after " LOG(INFO) << "Saving " << resulting_path << " finished after "
<< strings::HumanReadableElapsedTime(seconds); << strings::HumanReadableElapsedTime(info.duration_sec);
lock_guard lk{*save_mu_}; info.freq_map.clear();
last_save_info_->freq_map.clear();
for (const auto& k_v : rdb_name_map_) { for (const auto& k_v : rdb_name_map_) {
last_save_info_->freq_map.emplace_back(k_v); info.freq_map.emplace_back(k_v);
}
last_save_info_->save_time = absl::ToUnixSeconds(start_time_);
last_save_info_->file_name = resulting_path.generic_string();
last_save_info_->success_duration_sec = seconds;
} }
GenericError SaveStagesController::InitResources() { info.file_name = resulting_path.generic_string();
return info;
}
void SaveStagesController::InitResources() {
snapshots_.resize(use_dfs_format_ ? shard_set->size() + 1 : 1); snapshots_.resize(use_dfs_format_ ? shard_set->size() + 1 : 1);
for (auto& [snapshot, _] : snapshots_) for (auto& [snapshot, _] : snapshots_)
snapshot = make_unique<RdbSnapshot>(fq_threadpool_, snapshot_storage_.get()); snapshot = make_unique<RdbSnapshot>(fq_threadpool_, snapshot_storage_.get());
return {};
} }
// Remove .tmp extension or delete files in case of error // Remove .tmp extension or delete files in case of error

View file

@ -7,9 +7,7 @@
#include <filesystem> #include <filesystem>
#include "server/detail/snapshot_storage.h"
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include "server/server_family.h"
#include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/fiberqueue_threadpool.h"
namespace dfly { namespace dfly {
@ -19,16 +17,22 @@ class Service;
namespace detail { namespace detail {
class SnapshotStorage;
struct SaveInfo {
time_t save_time = 0; // epoch time in seconds.
uint32_t duration_sec = 0;
std::string file_name;
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
GenericError error;
};
struct SaveStagesInputs { struct SaveStagesInputs {
bool use_dfs_format_; bool use_dfs_format_;
std::string_view basename_; std::string_view basename_;
Transaction* trans_; Transaction* trans_;
Service* service_; Service* service_;
std::atomic_bool* is_saving_;
util::fb2::FiberQueueThreadPool* fq_threadpool_; util::fb2::FiberQueueThreadPool* fq_threadpool_;
LastSaveInfo* last_save_info_ ABSL_GUARDED_BY(save_mu_);
util::fb2::Mutex* save_mu_;
std::function<size_t()>* save_bytes_cb_;
std::shared_ptr<SnapshotStorage> snapshot_storage_; std::shared_ptr<SnapshotStorage> snapshot_storage_;
}; };
@ -70,7 +74,9 @@ struct SaveStagesController : public SaveStagesInputs {
~SaveStagesController(); ~SaveStagesController();
GenericError Save(); SaveInfo Save();
size_t GetSaveBuffersSize();
uint32_t GetCurrentSaveDuration();
private: private:
// In the new version (.dfs) we store a file for every shard and one more summary file. // In the new version (.dfs) we store a file for every shard and one more summary file.
@ -83,9 +89,9 @@ struct SaveStagesController : public SaveStagesInputs {
// Save a single rdb file // Save a single rdb file
void SaveRdb(); void SaveRdb();
void UpdateSaveInfo(); SaveInfo GetSaveInfo();
GenericError InitResources(); void InitResources();
// Remove .tmp extension or delete files in case of error // Remove .tmp extension or delete files in case of error
void FinalizeFileMovement(); void FinalizeFileMovement();
@ -102,8 +108,6 @@ struct SaveStagesController : public SaveStagesInputs {
void RunStage(void (SaveStagesController::*cb)(unsigned)); void RunStage(void (SaveStagesController::*cb)(unsigned));
size_t GetSaveBuffersSize();
private: private:
absl::Time start_time_; absl::Time start_time_;
std::filesystem::path full_path_; std::filesystem::path full_path_;

View file

@ -325,7 +325,7 @@ TEST_F(RdbTest, SaveFlush) {
do { do {
usleep(10); usleep(10);
} while (!service_->server_family().IsSaving()); } while (!service_->server_family().TEST_IsSaving());
Run({"flushdb"}); Run({"flushdb"});
save_fb.Join(); save_fb.Join();
@ -355,7 +355,7 @@ TEST_F(RdbTest, SaveManyDbs) {
do { do {
usleep(10); usleep(10);
} while (!service_->server_family().IsSaving()); } while (!service_->server_family().TEST_IsSaving());
pp_->at(1)->Await([&] { pp_->at(1)->Await([&] {
Run({"select", "1"}); Run({"select", "1"});

View file

@ -40,6 +40,7 @@ extern "C" {
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/debugcmd.h" #include "server/debugcmd.h"
#include "server/detail/save_stages_controller.h" #include "server/detail/save_stages_controller.h"
#include "server/detail/snapshot_storage.h"
#include "server/dflycmd.h" #include "server/dflycmd.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
@ -52,6 +53,7 @@ extern "C" {
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include "server/script_mgr.h" #include "server/script_mgr.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "server/snapshot.h"
#include "server/tiered_storage.h" #include "server/tiered_storage.h"
#include "server/transaction.h" #include "server/transaction.h"
#include "server/version.h" #include "server/version.h"
@ -1290,28 +1292,51 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
return GenericError{make_error_code(errc::operation_not_permitted), return GenericError{make_error_code(errc::operation_not_permitted),
StrCat("Can not save database in tiering mode")}; StrCat("Can not save database in tiering mode")};
} }
if (!ignore_state) { auto state = service_.GetGlobalState();
auto [new_state, success] = service_.SwitchState(GlobalState::ACTIVE, GlobalState::SAVING); // In some cases we want to create a snapshot even if server is not active, f.e in takeover
if (!success) { if (!ignore_state && (state != GlobalState::ACTIVE)) {
return GenericError{make_error_code(errc::operation_in_progress), return GenericError{make_error_code(errc::operation_in_progress),
StrCat(GlobalStateName(new_state), " - can not save database")}; StrCat(GlobalStateName(state), " - can not save database")};
}
} }
{ {
std::lock_guard lck(save_mu_); std::lock_guard lk(save_mu_);
start_save_time_ = absl::Now(); if (save_controller_) {
return GenericError{make_error_code(errc::operation_in_progress),
"SAVING - can not save database"};
} }
SaveStagesController sc{detail::SaveStagesInputs{ save_controller_ = make_unique<SaveStagesController>(detail::SaveStagesInputs{
new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_, new_version, basename, trans, &service_, fq_threadpool_.get(), snapshot_storage_});
&save_mu_, &save_bytes_cb_, snapshot_storage_}}; }
auto res = sc.Save();
detail::SaveInfo save_info = save_controller_->Save();
{ {
std::lock_guard lck(save_mu_); std::lock_guard lk(save_mu_);
start_save_time_.reset();
if (save_info.error) {
last_save_info_.last_error = save_info.error;
last_save_info_.last_error_time = save_info.save_time;
last_save_info_.failed_duration_sec = save_info.duration_sec;
} else {
last_save_info_.save_time = save_info.save_time;
last_save_info_.success_duration_sec = save_info.duration_sec;
last_save_info_.file_name = save_info.file_name;
last_save_info_.freq_map = save_info.freq_map;
} }
if (!ignore_state) save_controller_.reset();
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE); }
return res;
return save_info.error;
}
bool ServerFamily::TEST_IsSaving() const {
std::atomic_bool is_saving{false};
shard_set->pool()->AwaitFiberOnAll([&](auto*) {
if (SliceSnapshot::IsSnaphotInProgress())
is_saving.store(true, std::memory_order_relaxed);
});
return is_saving.load(std::memory_order_relaxed);
} }
error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) { error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) {
@ -1841,10 +1866,10 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("replication_full_sync_buffer_bytes", repl_mem.full_sync_buf_bytes_); append("replication_full_sync_buffer_bytes", repl_mem.full_sync_buf_bytes_);
} }
if (IsSaving()) { {
lock_guard lk{save_mu_}; lock_guard lk{save_mu_};
if (save_bytes_cb_) { if (save_controller_) {
append("save_buffer_bytes", save_bytes_cb_()); append("save_buffer_bytes", save_controller_->GetSaveBuffersSize());
} }
} }
} }
@ -1914,9 +1939,17 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING; size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING;
append("loading", is_loading); append("loading", is_loading);
auto curent_durration_sec = bool is_saving = false;
start_save_time_ ? (absl::Now() - *start_save_time_) / absl::Seconds(1) : 0; uint32_t curent_durration_sec = 0;
append("saving", curent_durration_sec != 0); {
lock_guard lk{save_mu_};
if (save_controller_) {
is_saving = true;
curent_durration_sec = save_controller_->GetCurrentSaveDuration();
}
}
append("saving", is_saving);
append("current_save_duration_sec", curent_durration_sec); append("current_save_duration_sec", curent_durration_sec);
for (const auto& k_v : save_info.freq_map) { for (const auto& k_v : save_info.freq_map) {

View file

@ -12,6 +12,7 @@
#include "facade/redis_parser.h" #include "facade/redis_parser.h"
#include "facade/reply_builder.h" #include "facade/reply_builder.h"
#include "server/channel_store.h" #include "server/channel_store.h"
#include "server/detail/save_stages_controller.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/replica.h" #include "server/replica.h"
#include "server/server_state.h" #include "server/server_state.h"
@ -173,9 +174,7 @@ class ServerFamily {
// future with error_code. // future with error_code.
util::fb2::Future<GenericError> Load(const std::string& file_name); util::fb2::Future<GenericError> Load(const std::string& file_name);
bool IsSaving() const { bool TEST_IsSaving() const;
return is_saving_.load(std::memory_order_relaxed);
}
void ConfigureMetrics(util::HttpListenerBase* listener); void ConfigureMetrics(util::HttpListenerBase* listener);
@ -282,13 +281,7 @@ class ServerFamily {
time_t start_time_ = 0; // in seconds, epoch time. time_t start_time_ = 0; // in seconds, epoch time.
LastSaveInfo last_save_info_ ABSL_GUARDED_BY(save_mu_); LastSaveInfo last_save_info_ ABSL_GUARDED_BY(save_mu_);
std::atomic_bool is_saving_{false}; std::unique_ptr<detail::SaveStagesController> save_controller_ ABSL_GUARDED_BY(save_mu_);
// this field duplicate SaveStagesController::start_save_time_
// TODO make SaveStagesController as member of this class
std::optional<absl::Time> start_save_time_;
// If a save operation is currently in progress, calling this function will provide information
// about the memory consumption during the save operation.
std::function<size_t()> save_bytes_cb_ = nullptr;
// Used to override save on shutdown behavior that is usually set // Used to override save on shutdown behavior that is usually set
// be --dbfilename. // be --dbfilename.

View file

@ -48,6 +48,10 @@ size_t SliceSnapshot::GetThreadLocalMemoryUsage() {
return mem; return mem;
} }
bool SliceSnapshot::IsSnaphotInProgress() {
return tl_slice_snapshots.size() > 0;
}
void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
DCHECK(!snapshot_fb_.IsJoinable()); DCHECK(!snapshot_fb_.IsJoinable());

View file

@ -61,6 +61,7 @@ class SliceSnapshot {
~SliceSnapshot(); ~SliceSnapshot();
static size_t GetThreadLocalMemoryUsage(); static size_t GetThreadLocalMemoryUsage();
static bool IsSnaphotInProgress();
// Initialize snapshot, start bucket iteration fiber, register listeners. // Initialize snapshot, start bucket iteration fiber, register listeners.
// In journal streaming mode it needs to be stopped by either Stop or Cancel. // In journal streaming mode it needs to be stopped by either Stop or Cancel.

View file

@ -1999,3 +1999,35 @@ async def test_journal_doesnt_yield_issue_2500(df_local_factory, df_seeder_facto
assert set(keys_master) == set(keys_replica) assert set(keys_master) == set(keys_replica)
await disconnect_clients(c_master, *[c_replica]) await disconnect_clients(c_master, *[c_replica])
@pytest.mark.asyncio
async def test_saving_replica(df_local_factory):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_local_factory.create(proactor_threads=1)
replica = df_local_factory.create(proactor_threads=1, dbfilename=f"dump_{tmp_file_name}")
df_local_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("DEBUG POPULATE 10000 key 4048 RAND")
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
async def save_replica():
await c_replica.execute_command("save")
async def is_saving():
return "saving:1" in (await c_replica.execute_command("INFO PERSISTENCE"))
save_task = asyncio.create_task(save_replica())
while not await is_saving(): # wait for replica start saving
asyncio.sleep(0.1)
await c_replica.execute_command("replicaof no one")
assert await is_saving()
await save_task
assert not await is_saving()
await disconnect_clients(c_master, *[c_replica])

View file

@ -125,21 +125,21 @@ async def test_dbfilenames(
@pytest.mark.slow @pytest.mark.slow
@dfly_args({**BASIC_ARGS, "dbfilename": "test-cron", "snapshot_cron": "* * * * *"}) @dfly_args({**BASIC_ARGS, "dbfilename": "test-cron", "snapshot_cron": "* * * * *"})
async def test_cron_snapshot(tmp_path: Path, async_client: aioredis.Redis): async def test_cron_snapshot(tmp_dir: Path, async_client: aioredis.Redis):
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client) await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
file = None file = None
with async_timeout.timeout(65): with async_timeout.timeout(65):
while file is None: while file is None:
await asyncio.sleep(1) await asyncio.sleep(1)
file = find_main_file(tmp_path, "test-cron-summary.dfs") file = find_main_file(tmp_dir, "test-cron-summary.dfs")
assert file is not None, os.listdir(tmp_path) assert file is not None, os.listdir(tmp_dir)
@pytest.mark.slow @pytest.mark.slow
@dfly_args({**BASIC_ARGS, "dbfilename": "test-cron-set"}) @dfly_args({**BASIC_ARGS, "dbfilename": "test-cron-set"})
async def test_set_cron_snapshot(tmp_path: Path, async_client: aioredis.Redis): async def test_set_cron_snapshot(tmp_dir: Path, async_client: aioredis.Redis):
await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client) await StaticSeeder(**LIGHTWEIGHT_SEEDER_ARGS).run(async_client)
await async_client.config_set("snapshot_cron", "* * * * *") await async_client.config_set("snapshot_cron", "* * * * *")
@ -148,7 +148,7 @@ async def test_set_cron_snapshot(tmp_path: Path, async_client: aioredis.Redis):
with async_timeout.timeout(65): with async_timeout.timeout(65):
while file is None: while file is None:
await asyncio.sleep(1) await asyncio.sleep(1)
file = find_main_file(tmp_path, "test-cron-set-summary.dfs") file = find_main_file(tmp_dir, "test-cron-set-summary.dfs")
assert file is not None assert file is not None