From 76b1f537b59ce6a9705c74149f678a7ad1a86604 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 4 Nov 2024 10:13:53 +0200 Subject: [PATCH] chore: refactor part of s3 logic for loading a snapshot. (#4044) Is done as preparation to share some of its code with GCS implementation. Signed-off-by: Roman Gershman --- src/server/detail/snapshot_storage.cc | 98 +++++++++++++-------------- src/server/detail/snapshot_storage.h | 20 ++++-- 2 files changed, 62 insertions(+), 56 deletions(-) diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index dd5bb5445..c58291125 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -33,6 +33,7 @@ namespace detail { using namespace util; using namespace std; +namespace { inline bool IsGcsPath(string_view path) { return absl::StartsWith(path, kGCSPrefix); } @@ -59,6 +60,36 @@ pair GetBucketPath(string_view path) { const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; #endif +} // namespace + +string SnapshotStorage::FindMatchingFile(string_view prefix, string_view dbfilename, + vector keys) { + std::sort(std::begin(keys), std::end(keys), + [](const SnapStat& l, const SnapStat& r) { return l.last_modified > r.last_modified; }); + + // Create a regex to match the object keys, substituting the timestamp + // and adding an extension if needed. + fs::path fl_path{prefix}; + fl_path.append(dbfilename); + SubstituteFilenamePlaceholders(&fl_path, + {.ts = "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})", + .year = "([0-9]{4})", + .month = "([0-9]{2})", + .day = "([0-9]{2})"}); + if (!fl_path.has_extension()) { + fl_path += "(-summary.dfs|.rdb)"; + } + const std::regex re(fl_path.string()); + + for (const SnapStat& key : keys) { + std::smatch m; + if (std::regex_match(key.name, m, re)) { + return key.name; + } + } + return {}; +} + io::Result, GenericError> SnapshotStorage::ExpandSnapshot(const string& load_path) { if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { return nonstd::make_unexpected( @@ -332,11 +363,9 @@ io::Result, GenericError> AwsS3SnapshotStorage::Op const std::string& path) { fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); return proactor->Await([&]() -> io::Result, GenericError> { - std::optional> bucket_path = GetBucketPath(path); - if (!bucket_path) { - return nonstd::make_unexpected(GenericError("Invalid S3 path")); - } - auto [bucket, key] = *bucket_path; + pair bucket_path = GetBucketPath(path); + + auto [bucket, key] = bucket_path; io::Result file = aws::S3WriteFile::Open(bucket, key, s3_); if (!file) { return nonstd::make_unexpected(GenericError(file.error(), "Failed to open write file")); @@ -361,55 +390,26 @@ io::Result AwsS3SnapshotStorage::LoadPath(std::string if (dbfilename.empty()) return ""; - std::optional> bucket_path = GetBucketPath(dir); - if (!bucket_path) { - return nonstd::make_unexpected( - GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"}); - } - auto [bucket_name, prefix] = *bucket_path; + auto [bucket_name, prefix] = GetBucketPath(dir); fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); - return proactor->Await( - [&, bucket_name = bucket_name, prefix = prefix]() -> io::Result { + + io::Result, GenericError> keys = + proactor->Await([&, bucket_name = bucket_name, prefix = prefix] { LOG(INFO) << "Load snapshot: Searching for snapshot in S3 path: " << kS3Prefix << bucket_name << "/" << prefix; - - // Create a regex to match the object keys, substituting the timestamp - // and adding an extension if needed. - fs::path fl_path{prefix}; - fl_path.append(dbfilename); - SubstituteFilenamePlaceholders( - &fl_path, {.ts = "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})", - .year = "([0-9]{4})", - .month = "([0-9]{2})", - .day = "([0-9]{2})"}); - if (!fl_path.has_extension()) { - fl_path += "(-summary.dfs|.rdb)"; - } - const std::regex re(fl_path.string()); - - // Sort the keys in reverse so the first. Since the timestamp format - // has lexicographic order, the matching snapshot file will be the latest - // snapshot. - io::Result, GenericError> keys = ListObjects(bucket_name, prefix); - if (!keys) { - return nonstd::make_unexpected(keys.error()); - } - - std::sort(std::rbegin(*keys), std::rend(*keys), [](const SnapStat& l, const SnapStat& r) { - return l.last_modified < r.last_modified; - }); - - for (const SnapStat& key : *keys) { - std::smatch m; - if (std::regex_match(key.name, m, re)) { - return std::string(kS3Prefix) + bucket_name + "/" + key.name; - } - } - - return nonstd::make_unexpected(GenericError( - std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found")); + return ListObjects(bucket_name, prefix); }); + if (!keys) { + return nonstd::make_unexpected(keys.error()); + } + + auto match_key = FindMatchingFile(prefix, dbfilename, *keys); + if (!match_key.empty()) { + return absl::StrCat(kS3Prefix, bucket_name, "/", match_key); + } + return nonstd::make_unexpected(GenericError( + std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found")); } io::Result, GenericError> AwsS3SnapshotStorage::ExpandFromPath( diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index 4fbb64626..5154cb4c3 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -59,6 +59,19 @@ class SnapshotStorage { } protected: + struct SnapStat { + SnapStat(std::string file_name, int64_t ts) + : name(std::move(file_name)), last_modified(std::move(ts)) { + } + std::string name; + int64_t last_modified; + }; + + // Returns empty string if nothing is matched. vector is passed by value on purpose, as it is + // been sorted inside. + static std::string FindMatchingFile(std::string_view prefix, std::string_view dbfilename, + std::vector keys); + virtual io::Result, GenericError> ExpandFromPath( const std::string& path) = 0; @@ -134,13 +147,6 @@ class AwsS3SnapshotStorage : public SnapshotStorage { std::error_code CheckPath(const std::string& path) final; - struct SnapStat { - SnapStat(std::string file_name, int64_t ts) - : name(std::move(file_name)), last_modified(std::move(ts)) { - } - std::string name; - int64_t last_modified; - }; // List the objects in the given bucket with the given prefix. This must // run from a proactor. io::Result, GenericError> ListObjects(std::string_view bucket_name,