mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: refactor part of s3 logic for loading a snapshot. (#4044)
Is done as preparation to share some of its code with GCS implementation. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
e103254cf9
commit
76b1f537b5
2 changed files with 62 additions and 56 deletions
|
@ -33,6 +33,7 @@ namespace detail {
|
|||
using namespace util;
|
||||
using namespace std;
|
||||
|
||||
namespace {
|
||||
inline bool IsGcsPath(string_view path) {
|
||||
return absl::StartsWith(path, kGCSPrefix);
|
||||
}
|
||||
|
@ -59,6 +60,36 @@ pair<string, string> GetBucketPath(string_view path) {
|
|||
const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
|
||||
#endif
|
||||
|
||||
} // namespace
|
||||
|
||||
string SnapshotStorage::FindMatchingFile(string_view prefix, string_view dbfilename,
|
||||
vector<SnapStat> keys) {
|
||||
std::sort(std::begin(keys), std::end(keys),
|
||||
[](const SnapStat& l, const SnapStat& r) { return l.last_modified > r.last_modified; });
|
||||
|
||||
// Create a regex to match the object keys, substituting the timestamp
|
||||
// and adding an extension if needed.
|
||||
fs::path fl_path{prefix};
|
||||
fl_path.append(dbfilename);
|
||||
SubstituteFilenamePlaceholders(&fl_path,
|
||||
{.ts = "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})",
|
||||
.year = "([0-9]{4})",
|
||||
.month = "([0-9]{2})",
|
||||
.day = "([0-9]{2})"});
|
||||
if (!fl_path.has_extension()) {
|
||||
fl_path += "(-summary.dfs|.rdb)";
|
||||
}
|
||||
const std::regex re(fl_path.string());
|
||||
|
||||
for (const SnapStat& key : keys) {
|
||||
std::smatch m;
|
||||
if (std::regex_match(key.name, m, re)) {
|
||||
return key.name;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
io::Result<vector<string>, GenericError> SnapshotStorage::ExpandSnapshot(const string& load_path) {
|
||||
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
|
||||
return nonstd::make_unexpected(
|
||||
|
@ -332,11 +363,9 @@ io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::Op
|
|||
const std::string& path) {
|
||||
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
|
||||
return proactor->Await([&]() -> io::Result<std::pair<io::Sink*, uint8_t>, GenericError> {
|
||||
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(path);
|
||||
if (!bucket_path) {
|
||||
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
|
||||
}
|
||||
auto [bucket, key] = *bucket_path;
|
||||
pair<string, string> bucket_path = GetBucketPath(path);
|
||||
|
||||
auto [bucket, key] = bucket_path;
|
||||
io::Result<aws::S3WriteFile> file = aws::S3WriteFile::Open(bucket, key, s3_);
|
||||
if (!file) {
|
||||
return nonstd::make_unexpected(GenericError(file.error(), "Failed to open write file"));
|
||||
|
@ -361,55 +390,26 @@ io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string
|
|||
if (dbfilename.empty())
|
||||
return "";
|
||||
|
||||
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(dir);
|
||||
if (!bucket_path) {
|
||||
return nonstd::make_unexpected(
|
||||
GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"});
|
||||
}
|
||||
auto [bucket_name, prefix] = *bucket_path;
|
||||
auto [bucket_name, prefix] = GetBucketPath(dir);
|
||||
|
||||
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
|
||||
return proactor->Await(
|
||||
[&, bucket_name = bucket_name, prefix = prefix]() -> io::Result<std::string, GenericError> {
|
||||
|
||||
io::Result<std::vector<SnapStat>, GenericError> keys =
|
||||
proactor->Await([&, bucket_name = bucket_name, prefix = prefix] {
|
||||
LOG(INFO) << "Load snapshot: Searching for snapshot in S3 path: " << kS3Prefix
|
||||
<< bucket_name << "/" << prefix;
|
||||
|
||||
// Create a regex to match the object keys, substituting the timestamp
|
||||
// and adding an extension if needed.
|
||||
fs::path fl_path{prefix};
|
||||
fl_path.append(dbfilename);
|
||||
SubstituteFilenamePlaceholders(
|
||||
&fl_path, {.ts = "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})",
|
||||
.year = "([0-9]{4})",
|
||||
.month = "([0-9]{2})",
|
||||
.day = "([0-9]{2})"});
|
||||
if (!fl_path.has_extension()) {
|
||||
fl_path += "(-summary.dfs|.rdb)";
|
||||
}
|
||||
const std::regex re(fl_path.string());
|
||||
|
||||
// Sort the keys in reverse so the first. Since the timestamp format
|
||||
// has lexicographic order, the matching snapshot file will be the latest
|
||||
// snapshot.
|
||||
io::Result<std::vector<SnapStat>, GenericError> keys = ListObjects(bucket_name, prefix);
|
||||
if (!keys) {
|
||||
return nonstd::make_unexpected(keys.error());
|
||||
}
|
||||
|
||||
std::sort(std::rbegin(*keys), std::rend(*keys), [](const SnapStat& l, const SnapStat& r) {
|
||||
return l.last_modified < r.last_modified;
|
||||
});
|
||||
|
||||
for (const SnapStat& key : *keys) {
|
||||
std::smatch m;
|
||||
if (std::regex_match(key.name, m, re)) {
|
||||
return std::string(kS3Prefix) + bucket_name + "/" + key.name;
|
||||
}
|
||||
}
|
||||
|
||||
return nonstd::make_unexpected(GenericError(
|
||||
std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found"));
|
||||
return ListObjects(bucket_name, prefix);
|
||||
});
|
||||
if (!keys) {
|
||||
return nonstd::make_unexpected(keys.error());
|
||||
}
|
||||
|
||||
auto match_key = FindMatchingFile(prefix, dbfilename, *keys);
|
||||
if (!match_key.empty()) {
|
||||
return absl::StrCat(kS3Prefix, bucket_name, "/", match_key);
|
||||
}
|
||||
return nonstd::make_unexpected(GenericError(
|
||||
std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found"));
|
||||
}
|
||||
|
||||
io::Result<vector<string>, GenericError> AwsS3SnapshotStorage::ExpandFromPath(
|
||||
|
|
|
@ -59,6 +59,19 @@ class SnapshotStorage {
|
|||
}
|
||||
|
||||
protected:
|
||||
struct SnapStat {
|
||||
SnapStat(std::string file_name, int64_t ts)
|
||||
: name(std::move(file_name)), last_modified(std::move(ts)) {
|
||||
}
|
||||
std::string name;
|
||||
int64_t last_modified;
|
||||
};
|
||||
|
||||
// Returns empty string if nothing is matched. vector is passed by value on purpose, as it is
|
||||
// been sorted inside.
|
||||
static std::string FindMatchingFile(std::string_view prefix, std::string_view dbfilename,
|
||||
std::vector<SnapStat> keys);
|
||||
|
||||
virtual io::Result<std::vector<std::string>, GenericError> ExpandFromPath(
|
||||
const std::string& path) = 0;
|
||||
|
||||
|
@ -134,13 +147,6 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
|
|||
|
||||
std::error_code CheckPath(const std::string& path) final;
|
||||
|
||||
struct SnapStat {
|
||||
SnapStat(std::string file_name, int64_t ts)
|
||||
: name(std::move(file_name)), last_modified(std::move(ts)) {
|
||||
}
|
||||
std::string name;
|
||||
int64_t last_modified;
|
||||
};
|
||||
// List the objects in the given bucket with the given prefix. This must
|
||||
// run from a proactor.
|
||||
io::Result<std::vector<SnapStat>, GenericError> ListObjects(std::string_view bucket_name,
|
||||
|
|
Loading…
Reference in a new issue