mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
chore: refactor snapshot expanding logic (#4003)
S3 and file expansion logic had some duplicate code. this PR refactors it before adding GCS support. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
db6504564d
commit
c2710604de
3 changed files with 101 additions and 91 deletions
|
@ -30,17 +30,19 @@ namespace dfly {
|
||||||
namespace detail {
|
namespace detail {
|
||||||
|
|
||||||
using namespace util;
|
using namespace util;
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
std::optional<std::pair<std::string, std::string>> GetBucketPath(std::string_view path) {
|
// Returns bucket_name, obj_path for an s3 path.
|
||||||
|
optional<pair<string, string>> GetBucketPath(string_view path) {
|
||||||
std::string_view clean = absl::StripPrefix(path, kS3Prefix);
|
std::string_view clean = absl::StripPrefix(path, kS3Prefix);
|
||||||
|
|
||||||
size_t pos = clean.find('/');
|
size_t pos = clean.find('/');
|
||||||
if (pos == std::string_view::npos) {
|
if (pos == string_view::npos) {
|
||||||
return std::make_pair(std::string(clean), "");
|
return make_pair(string(clean), "");
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string bucket_name{clean.substr(0, pos)};
|
string bucket_name{clean.substr(0, pos)};
|
||||||
std::string obj_path{clean.substr(pos + 1)};
|
string obj_path{clean.substr(pos + 1)};
|
||||||
return std::make_pair(std::move(bucket_name), std::move(obj_path));
|
return std::make_pair(std::move(bucket_name), std::move(obj_path));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -48,6 +50,30 @@ std::optional<std::pair<std::string, std::string>> GetBucketPath(std::string_vie
|
||||||
const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
|
const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
io::Result<vector<string>, GenericError> SnapshotStorage::ExpandSnapshot(const string& load_path) {
|
||||||
|
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
|
||||||
|
return nonstd::make_unexpected(
|
||||||
|
GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension"));
|
||||||
|
}
|
||||||
|
|
||||||
|
error_code ec = CheckPath(load_path);
|
||||||
|
if (ec) {
|
||||||
|
return nonstd::make_unexpected(GenericError(ec, "File not found"));
|
||||||
|
}
|
||||||
|
|
||||||
|
vector<string> paths{{load_path}};
|
||||||
|
|
||||||
|
// Collect all other files in case we're loading dfs.
|
||||||
|
if (absl::EndsWith(load_path, "summary.dfs")) {
|
||||||
|
auto res = ExpandFromPath(load_path);
|
||||||
|
if (!res) {
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
paths.insert(paths.end(), res->begin(), res->end());
|
||||||
|
}
|
||||||
|
return paths;
|
||||||
|
}
|
||||||
|
|
||||||
FileSnapshotStorage::FileSnapshotStorage(fb2::FiberQueueThreadPool* fq_threadpool)
|
FileSnapshotStorage::FileSnapshotStorage(fb2::FiberQueueThreadPool* fq_threadpool)
|
||||||
: fq_threadpool_{fq_threadpool} {
|
: fq_threadpool_{fq_threadpool} {
|
||||||
}
|
}
|
||||||
|
@ -143,43 +169,29 @@ io::Result<std::string, GenericError> FileSnapshotStorage::LoadPath(std::string_
|
||||||
std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found"));
|
std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found"));
|
||||||
}
|
}
|
||||||
|
|
||||||
io::Result<std::vector<std::string>, GenericError> FileSnapshotStorage::LoadPaths(
|
io::Result<vector<string>, GenericError> FileSnapshotStorage::ExpandFromPath(const string& path) {
|
||||||
const std::string& load_path) {
|
string glob = absl::StrReplaceAll(path, {{"summary", "????"}});
|
||||||
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
|
|
||||||
return nonstd::make_unexpected(
|
|
||||||
GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension"));
|
|
||||||
}
|
|
||||||
|
|
||||||
std::vector<std::string> paths{{load_path}};
|
|
||||||
|
|
||||||
// Collect all other files in case we're loading dfs.
|
|
||||||
if (absl::EndsWith(load_path, "summary.dfs")) {
|
|
||||||
std::string glob = absl::StrReplaceAll(load_path, {{"summary", "????"}});
|
|
||||||
io::Result<io::StatShortVec> files = io::StatFiles(glob);
|
io::Result<io::StatShortVec> files = io::StatFiles(glob);
|
||||||
|
|
||||||
if (files && files->size() == 0) {
|
if (!files || files->size() == 0) {
|
||||||
return nonstd::make_unexpected(
|
return nonstd::make_unexpected(GenericError(make_error_code(errc::no_such_file_or_directory),
|
||||||
GenericError(std::make_error_code(std::errc::no_such_file_or_directory),
|
|
||||||
"Cound not find DFS shard files"));
|
"Cound not find DFS shard files"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vector<string> paths;
|
||||||
for (auto& fstat : *files) {
|
for (auto& fstat : *files) {
|
||||||
paths.push_back(std::move(fstat.name));
|
paths.push_back(std::move(fstat.name));
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// Check all paths are valid.
|
|
||||||
for (const auto& path : paths) {
|
|
||||||
std::error_code ec;
|
|
||||||
(void)fs::canonical(path, ec);
|
|
||||||
if (ec) {
|
|
||||||
return nonstd::make_unexpected(ec);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return paths;
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
error_code FileSnapshotStorage::CheckPath(const string& path) {
|
||||||
|
error_code ec;
|
||||||
|
std::ignore = fs::canonical(path, ec);
|
||||||
|
return ec;
|
||||||
|
}
|
||||||
|
|
||||||
#ifdef WITH_AWS
|
#ifdef WITH_AWS
|
||||||
AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https,
|
AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https,
|
||||||
bool ec2_metadata, bool sign_payload) {
|
bool ec2_metadata, bool sign_payload) {
|
||||||
|
@ -288,54 +300,48 @@ io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
io::Result<std::vector<std::string>, GenericError> AwsS3SnapshotStorage::LoadPaths(
|
io::Result<vector<string>, GenericError> AwsS3SnapshotStorage::ExpandFromPath(
|
||||||
const std::string& load_path) {
|
const string& load_path) {
|
||||||
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
|
|
||||||
return nonstd::make_unexpected(
|
|
||||||
GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension"));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Find snapshot shard files if we're loading DFS.
|
|
||||||
if (absl::EndsWith(load_path, "summary.dfs")) {
|
|
||||||
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
|
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
|
||||||
return proactor->Await([&]() -> io::Result<std::vector<std::string>, GenericError> {
|
optional<pair<string, string>> bucket_path = GetBucketPath(load_path);
|
||||||
std::vector<std::string> paths{{load_path}};
|
|
||||||
|
|
||||||
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(load_path);
|
|
||||||
if (!bucket_path) {
|
if (!bucket_path) {
|
||||||
return nonstd::make_unexpected(
|
return nonstd::make_unexpected(
|
||||||
GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"});
|
GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"});
|
||||||
}
|
}
|
||||||
const auto [bucket_name, obj_path] = *bucket_path;
|
const auto [bucket_name, obj_path] = *bucket_path;
|
||||||
|
|
||||||
const std::regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}}));
|
const std::regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}}));
|
||||||
|
|
||||||
// Limit prefix to objects in the same 'directory' as load_path.
|
// Limit prefix to objects in the same 'directory' as load_path.
|
||||||
const size_t pos = obj_path.find_last_of('/');
|
const size_t pos = obj_path.find_last_of('/');
|
||||||
const std::string prefix = (pos == std::string_view::npos) ? "" : obj_path.substr(0, pos);
|
const std::string prefix = (pos == std::string_view::npos) ? "" : obj_path.substr(0, pos);
|
||||||
|
|
||||||
|
auto paths = proactor->Await([&]() -> io::Result<vector<string>, GenericError> {
|
||||||
const io::Result<std::vector<SnapStat>, GenericError> keys = ListObjects(bucket_name, prefix);
|
const io::Result<std::vector<SnapStat>, GenericError> keys = ListObjects(bucket_name, prefix);
|
||||||
if (!keys) {
|
if (!keys) {
|
||||||
return nonstd::make_unexpected(keys.error());
|
return nonstd::make_unexpected(keys.error());
|
||||||
}
|
}
|
||||||
|
vector<string> res;
|
||||||
for (const SnapStat& key : *keys) {
|
for (const SnapStat& key : *keys) {
|
||||||
std::smatch m;
|
std::smatch m;
|
||||||
if (std::regex_match(key.name, m, re)) {
|
if (std::regex_match(key.name, m, re)) {
|
||||||
paths.push_back(std::string(kS3Prefix) + bucket_name + "/" + key.name);
|
res.push_back(std::string(kS3Prefix) + bucket_name + "/" + key.name);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (paths.size() <= 1) {
|
return res;
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!paths || paths->empty()) {
|
||||||
return nonstd::make_unexpected(
|
return nonstd::make_unexpected(
|
||||||
GenericError{std::make_error_code(std::errc::no_such_file_or_directory),
|
GenericError{std::make_error_code(std::errc::no_such_file_or_directory),
|
||||||
"Cound not find DFS snapshot shard files"});
|
"Cound not find DFS snapshot shard files"});
|
||||||
}
|
}
|
||||||
|
|
||||||
return paths;
|
return *paths;
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return std::vector<std::string>{{load_path}};
|
error_code AwsS3SnapshotStorage::CheckPath(const std::string& path) {
|
||||||
|
return {};
|
||||||
}
|
}
|
||||||
|
|
||||||
io::Result<std::vector<AwsS3SnapshotStorage::SnapStat>, GenericError>
|
io::Result<std::vector<AwsS3SnapshotStorage::SnapStat>, GenericError>
|
||||||
|
|
|
@ -48,9 +48,14 @@ class SnapshotStorage {
|
||||||
virtual io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
virtual io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
||||||
std::string_view dbfilename) = 0;
|
std::string_view dbfilename) = 0;
|
||||||
|
|
||||||
// Returns the snapshot paths given the RDB file or DFS summary file path.
|
// Searches for all the relevant snapshot files given the RDB file or DFS summary file path.
|
||||||
virtual io::Result<std::vector<std::string>, GenericError> LoadPaths(
|
io::Result<std::vector<std::string>, GenericError> ExpandSnapshot(const std::string& load_path);
|
||||||
const std::string& load_path) = 0;
|
|
||||||
|
protected:
|
||||||
|
virtual io::Result<std::vector<std::string>, GenericError> ExpandFromPath(
|
||||||
|
const std::string& path) = 0;
|
||||||
|
|
||||||
|
virtual std::error_code CheckPath(const std::string& path) = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
class FileSnapshotStorage : public SnapshotStorage {
|
class FileSnapshotStorage : public SnapshotStorage {
|
||||||
|
@ -65,10 +70,10 @@ class FileSnapshotStorage : public SnapshotStorage {
|
||||||
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
||||||
std::string_view dbfilename) override;
|
std::string_view dbfilename) override;
|
||||||
|
|
||||||
io::Result<std::vector<std::string>, GenericError> LoadPaths(
|
|
||||||
const std::string& load_path) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;
|
||||||
|
|
||||||
|
std::error_code CheckPath(const std::string& path) final;
|
||||||
util::fb2::FiberQueueThreadPool* fq_threadpool_;
|
util::fb2::FiberQueueThreadPool* fq_threadpool_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -86,10 +91,11 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
|
||||||
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
||||||
std::string_view dbfilename) override;
|
std::string_view dbfilename) override;
|
||||||
|
|
||||||
io::Result<std::vector<std::string>, GenericError> LoadPaths(
|
|
||||||
const std::string& load_path) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;
|
||||||
|
|
||||||
|
std::error_code CheckPath(const std::string& path) final;
|
||||||
|
|
||||||
struct SnapStat {
|
struct SnapStat {
|
||||||
SnapStat(std::string file_name, int64_t ts)
|
SnapStat(std::string file_name, int64_t ts)
|
||||||
: name(std::move(file_name)), last_modified(std::move(ts)) {
|
: name(std::move(file_name)), last_modified(std::move(ts)) {
|
||||||
|
@ -105,8 +111,6 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
|
||||||
std::shared_ptr<Aws::S3::S3Client> s3_;
|
std::shared_ptr<Aws::S3::S3Client> s3_;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Returns bucket_name, obj_path for an s3 path.
|
|
||||||
std::optional<std::pair<std::string, std::string>> GetBucketPath(std::string_view path);
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef __linux__
|
#ifdef __linux__
|
||||||
|
|
|
@ -1061,7 +1061,7 @@ std::optional<fb2::Future<GenericError>> ServerFamily::Load(string_view load_pat
|
||||||
return future;
|
return future;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto paths_result = snapshot_storage_->LoadPaths(path);
|
auto paths_result = snapshot_storage_->ExpandSnapshot(path);
|
||||||
if (!paths_result) {
|
if (!paths_result) {
|
||||||
LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();
|
LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue