1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

feat(snapshot): add snapshot storage (#1827)

* feat(snapshot): add snapshot storage

* feat(snapshot): replace snapshot storage open file output variables
This commit is contained in:
Andy Dunstall 2023-09-08 17:12:13 +01:00 committed by GitHub
parent 48488c5682
commit 9d45ddd9fe
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 128 additions and 48 deletions

View file

@ -151,6 +151,65 @@ GenericError ValidateFilename(const fs::path& filename, bool new_version) {
return {};
}
FileSnapshotStorage::FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool)
: fq_threadpool_{fq_threadpool} {
}
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> FileSnapshotStorage::OpenFile(
const std::string& path) {
if (fq_threadpool_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_threadpool_);
if (!res) {
return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing"));
}
return std::pair(*res, FileType::FILE);
} else {
#ifdef __linux__
auto res = OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return nonstd::make_unexpected(GenericError(
res.error(),
"Couldn't open file for writing (is direct I/O supported by the file system?)"));
}
uint8_t file_type = FileType::FILE | FileType::IO_URING;
if (kRdbWriteFlags & O_DIRECT) {
file_type |= FileType::DIRECT;
}
return std::pair(new LinuxWriteWrapper(res->release()), file_type);
#else
LOG(FATAL) << "Linux I/O is not supported on this platform";
#endif
}
}
AwsS3SnapshotStorage::AwsS3SnapshotStorage(util::cloud::AWS* aws) : aws_{aws} {
}
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::OpenFile(
const std::string& path) {
DCHECK(aws_);
optional<pair<string, string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
}
auto [bucket_name, obj_path] = *bucket_path;
cloud::S3Bucket bucket(*aws_, bucket_name);
error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return nonstd::make_unexpected(GenericError(ec, "Couldn't connect to S3 bucket"));
}
auto res = bucket.OpenWriteFile(obj_path);
if (!res) {
return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for writing"));
}
return std::pair<io::Sink*, uint8_t>(*res, FileType::CLOUD);
}
string InferLoadFile(string_view dir, cloud::AWS* aws) {
fs::path data_folder;
string bucket_name, obj_path;
@ -227,52 +286,19 @@ string InferLoadFile(string_view dir, cloud::AWS* aws) {
GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
const RdbSaver::GlobalData& glob_data) {
bool is_direct = false;
VLOG(1) << "Saving RDB " << path;
if (IsCloudPath(path)) {
DCHECK(aws_);
optional<pair<string, string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return GenericError("Invalid S3 path");
}
auto [bucket_name, obj_path] = *bucket_path;
cloud::S3Bucket bucket(*aws_, bucket_name);
error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return GenericError(ec, "Couldn't connect to S3 bucket");
}
auto res = bucket.OpenWriteFile(obj_path);
if (!res) {
return GenericError(res.error(), "Couldn't open file for writing");
}
io_sink_.reset(*res);
} else {
if (fq_tp_) { // EPOLL
auto res = util::OpenFiberWriteFile(path, fq_tp_);
if (!res)
return GenericError(res.error(), "Couldn't open file for writing");
io_sink_.reset(*res);
} else {
#ifdef __linux__
auto res = OpenLinux(path, kRdbWriteFlags, 0666);
if (!res) {
return GenericError(
res.error(),
"Couldn't open file for writing (is direct I/O supported by the file system?)");
}
is_linux_file_ = true;
io_sink_.reset(new LinuxWriteWrapper(res->release()));
is_direct = kRdbWriteFlags & O_DIRECT;
#else
LOG(FATAL) << "Linux I/O is not supported on this platform";
#endif
}
auto res = snapshot_storage_->OpenFile(path);
if (!res) {
return res.error();
}
saver_.reset(new RdbSaver(io_sink_.get(), save_mode, is_direct));
auto [file, file_type] = *res;
io_sink_.reset(file);
is_linux_file_ = file_type & FileType::IO_URING;
saver_.reset(new RdbSaver(io_sink_.get(), save_mode, file_type | FileType::DIRECT));
return saver_->SaveHeader(move(glob_data));
}
@ -431,7 +457,7 @@ GenericError SaveStagesController::InitResources() {
snapshots_.resize(use_dfs_format_ ? shard_set->size() + 1 : 1);
for (auto& [snapshot, _] : snapshots_)
snapshot = make_unique<RdbSnapshot>(fq_threadpool_, aws_->get());
snapshot = make_unique<RdbSnapshot>(fq_threadpool_, snapshot_storage_.get());
return {};
}

View file

@ -18,6 +18,46 @@ class Transaction;
class Service;
namespace detail {
enum FileType : uint8_t {
FILE = (1u << 0),
CLOUD = (1u << 1),
IO_URING = (1u << 2),
DIRECT = (1u << 3),
};
class SnapshotStorage {
public:
virtual ~SnapshotStorage() = default;
// Opens the file at the given path, and returns the open file and file
// type, which is a bitmask of FileType.
virtual io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenFile(
const std::string& path) = 0;
};
class FileSnapshotStorage : public SnapshotStorage {
public:
FileSnapshotStorage(FiberQueueThreadPool* fq_threadpool);
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenFile(
const std::string& path) override;
private:
util::fb2::FiberQueueThreadPool* fq_threadpool_;
};
class AwsS3SnapshotStorage : public SnapshotStorage {
public:
AwsS3SnapshotStorage(util::cloud::AWS* aws);
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenFile(
const std::string& path) override;
private:
util::cloud::AWS* aws_;
};
struct SaveStagesInputs {
bool use_dfs_format_;
std::string_view basename_;
@ -28,11 +68,13 @@ struct SaveStagesInputs {
std::shared_ptr<LastSaveInfo>* last_save_info_;
util::fb2::Mutex* save_mu_;
std::unique_ptr<util::cloud::AWS>* aws_;
std::shared_ptr<SnapshotStorage> snapshot_storage_;
};
class RdbSnapshot {
public:
RdbSnapshot(FiberQueueThreadPool* fq_tp, util::cloud::AWS* aws) : fq_tp_{fq_tp}, aws_{aws} {
RdbSnapshot(FiberQueueThreadPool* fq_tp, SnapshotStorage* snapshot_storage)
: fq_tp_{fq_tp}, snapshot_storage_{snapshot_storage} {
}
GenericError Start(SaveMode save_mode, const string& path, const RdbSaver::GlobalData& glob_data);
@ -53,7 +95,7 @@ class RdbSnapshot {
bool started_ = false;
bool is_linux_file_ = false;
util::fb2::FiberQueueThreadPool* fq_tp_ = nullptr;
util::cloud::AWS* aws_ = nullptr;
SnapshotStorage* snapshot_storage_ = nullptr;
unique_ptr<io::Sink> io_sink_;
unique_ptr<RdbSaver> saver_;

View file

@ -434,6 +434,11 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
if (ec) {
LOG(FATAL) << "Failed to initialize AWS " << ec;
}
snapshot_storage_ = std::make_shared<detail::AwsS3SnapshotStorage>(aws_.get());
} else if (fq_threadpool_) {
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(fq_threadpool_.get());
} else {
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(nullptr);
}
string load_path = detail::InferLoadFile(flag_dir, aws_.get());
@ -921,9 +926,9 @@ GenericError ServerFamily::DoSave() {
}
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans) {
SaveStagesController sc{detail::SaveStagesInputs{new_version, basename, trans, &service_,
&is_saving_, fq_threadpool_.get(),
&last_save_info_, &save_mu_, &aws_}};
SaveStagesController sc{detail::SaveStagesInputs{
new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_,
&save_mu_, &aws_, snapshot_storage_}};
return sc.Save();
}

View file

@ -27,6 +27,12 @@ class AWS;
namespace dfly {
namespace detail {
class SnapshotStorage;
} // namespace detail
std::string GetPassword();
namespace journal {
@ -242,6 +248,7 @@ class ServerFamily {
Done schedule_done_;
std::unique_ptr<FiberQueueThreadPool> fq_threadpool_;
std::unique_ptr<util::cloud::AWS> aws_;
std::shared_ptr<detail::SnapshotStorage> snapshot_storage_;
};
} // namespace dfly