1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: update helio dependency (#984)

Also remove direct references for boost fibers from the code.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-03-24 18:04:05 +03:00 committed by GitHub
parent 39174f398a
commit 12abe0bc12
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 86 additions and 110 deletions

2
helio

@ -1 +1 @@
Subproject commit 124e97a30e854e0305277e8d80b1b575612181d0
Subproject commit 245f9af5e738f3c571414755921cd910222c0d67

View file

@ -4,7 +4,6 @@
#pragma once
#include <boost/fiber/mutex.hpp>
#include <functional>
#include <string_view>

View file

@ -8,8 +8,6 @@
#include <absl/strings/match.h>
#include <mimalloc.h>
#include <boost/fiber/operations.hpp>
#include "base/flags.h"
#include "base/logging.h"
#include "facade/conn_context.h"
@ -44,8 +42,6 @@ using namespace util;
using namespace std;
using nonstd::make_unexpected;
namespace fibers = boost::fibers;
namespace facade {
namespace {
@ -552,7 +548,7 @@ io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
void Connection::ConnectionFlow(FiberSocketBase* peer) {
stats_ = service_->GetThreadLocalConnectionStats();
auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); });
auto dispatch_fb = MakeFiber(fibers_ext::Launch::dispatch, [&] { DispatchFiber(peer); });
++stats_->num_conns;
++stats_->conn_received_cnt;
@ -589,7 +585,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
VLOG(1) << "Before dispatch_fb.join()";
dispatch_fb.join();
dispatch_fb.Join();
VLOG(1) << "After dispatch_fb.join()";
service_->OnClose(cc_.get());

View file

@ -9,7 +9,6 @@
#include <absl/types/span.h>
#include <atomic>
#include <boost/fiber/mutex.hpp>
#include <string_view>
#include <vector>
@ -216,7 +215,7 @@ template <typename T> struct AggregateValue {
}
private:
::boost::fibers::mutex mu_{};
util::fibers_ext::Mutex mu_{};
T current_{};
};
@ -320,7 +319,7 @@ class Context : protected Cancellation {
private:
GenericError err_;
::boost::fibers::mutex mu_;
util::fibers_ext::Mutex mu_;
ErrHandler err_handler_;
::util::fibers_ext::Fiber err_handler_fb_;

View file

@ -8,15 +8,13 @@ extern "C" {
#include "redis/object.h"
}
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/operations.hpp>
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"
#include "util/proactor_base.h"
namespace dfly {
@ -506,7 +504,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
mi_heap_collect(ServerState::tlocal()->data_heap(), true);
};
boost::fibers::fiber(std::move(cb)).detach();
util::MakeFiber(std::move(cb)).Detach();
return;
}
@ -526,11 +524,11 @@ void DbSlice::FlushDb(DbIndex db_ind) {
}
}
boost::fibers::fiber([all_dbs = std::move(all_dbs)]() mutable {
MakeFiber([all_dbs = std::move(all_dbs)]() mutable {
for (auto& db : all_dbs) {
db.reset();
}
}).detach();
}).Detach();
}
void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) {

View file

@ -7,7 +7,6 @@
#include <absl/random/random.h>
#include <absl/strings/str_cat.h>
#include <boost/fiber/operations.hpp>
#include <filesystem>
#include "base/flags.h"
@ -33,7 +32,6 @@ namespace dfly {
using namespace util;
using boost::intrusive_ptr;
using boost::fibers::fiber;
using namespace facade;
namespace fs = std::filesystem;
using absl::GetFlag;
@ -419,7 +417,7 @@ void DebugCmd::Inspect(string_view key) {
}
void DebugCmd::Watched() {
boost::fibers::mutex mu;
util::fibers_ext::Mutex mu;
vector<string> watched_keys;
vector<string> awaked_trans;

View file

@ -19,7 +19,6 @@
#include "server/server_family.h"
#include "server/server_state.h"
#include "server/transaction.h"
using namespace std;
ABSL_DECLARE_FLAG(string, dir);
@ -27,7 +26,8 @@ ABSL_DECLARE_FLAG(string, dir);
namespace dfly {
using namespace facade;
using namespace std;
using namespace util;
using util::ProactorBase;
namespace {
@ -383,7 +383,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
}
flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
flow->full_sync_fb = fibers_ext::Fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
return OpStatus::OK;
}
@ -465,7 +465,7 @@ uint32_t DflyCmd::CreateSyncSession(ConnectionContext* cntx) {
// Spawn external fiber to allow destructing the context from outside
// and return from the handler immediately.
::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach();
util::MakeFiber(&DflyCmd::StopReplication, this, sync_id).Detach();
};
string address = cntx->owner()->RemoteEndpointAddress();

View file

@ -7,8 +7,6 @@
#include <absl/container/btree_map.h>
#include <atomic>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/mutex.hpp>
#include <memory>
#include "server/conn_context.h"
@ -110,7 +108,7 @@ class DflyCmd {
uint32_t listening_port;
std::vector<FlowInfo> flows;
::boost::fibers::mutex mu; // See top of header for locking levels.
util::fibers_ext::Mutex mu; // See top of header for locking levels.
};
struct ReplicaRoleInfo {
@ -206,7 +204,7 @@ class DflyCmd {
using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;
::boost::fibers::mutex mu_; // Guard global operations. See header top for locking levels.
util::fibers_ext::Mutex mu_; // Guard global operations. See header top for locking levels.
};
} // namespace dfly

View file

@ -44,7 +44,6 @@ ABSL_FLAG(float, mem_defrag_page_utilization_threshold, 0.8,
namespace dfly {
using namespace util;
namespace fibers = ::boost::fibers;
using absl::GetFlag;
namespace {
@ -198,8 +197,8 @@ uint32_t EngineShard::DefragTask() {
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
FiberProps::SetName(absl::StrCat("shard_queue", index));
fiber_q_ = MakeFiber([this, index = pb->GetIndex()] {
ThisFiber::SetName(absl::StrCat("shard_queue", index));
queue_.Run();
});
@ -208,8 +207,8 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;
fiber_periodic_ = fibers::fiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] {
FiberProps::SetName(absl::StrCat("shard_periodic", index));
fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms));
});
}
@ -227,15 +226,15 @@ EngineShard::~EngineShard() {
void EngineShard::Shutdown() {
queue_.Shutdown();
fiber_q_.join();
fiber_q_.Join();
if (tiered_storage_) {
tiered_storage_->Shutdown();
}
fiber_periodic_done_.Notify();
if (fiber_periodic_.joinable()) {
fiber_periodic_.join();
if (fiber_periodic_.IsJoinable()) {
fiber_periodic_.Join();
}
ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
@ -532,7 +531,7 @@ BlockingController* EngineShard::EnsureBlockingController() {
}
void EngineShard::TEST_EnableHeartbeat() {
fiber_periodic_ = fibers::fiber([this, period_ms = 1] {
fiber_periodic_ = MakeFiber([this, period_ms = 1] {
FiberProps::SetName("shard_periodic_TEST");
RunPeriodic(std::chrono::milliseconds(period_ms));
});

View file

@ -189,7 +189,7 @@ class EngineShard {
bool DoDefrag();
::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;
util::fibers_ext::Fiber fiber_q_;
TxQueue txq_;
MiMemoryResource mi_resource_;
@ -207,7 +207,7 @@ class EngineShard {
IntentLock shard_lock_;
uint32_t defrag_task_ = 0;
::boost::fibers::fiber fiber_periodic_;
::util::fibers_ext::Fiber fiber_periodic_;
::util::fibers_ext::Done fiber_periodic_done_;
DefragTaskState defrag_state_;

View file

@ -276,7 +276,7 @@ TEST_F(GenericFamilyTest, Move) {
ASSERT_THAT(Run({"get", "a"}), "test");
// Check MOVE awakes blocking operations
auto fb_blpop = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto fb_blpop = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
Run({"select", "1"});
auto resp = Run({"blpop", "l", "0"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));

View file

@ -55,7 +55,7 @@ error_code Journal::Close() {
VLOG(1) << "Journal::Close";
fibers::mutex ec_mu;
fibers_ext::Mutex ec_mu;
error_code res;
lock_guard lk(state_mu_);

View file

@ -53,7 +53,7 @@ class Journal {
bool await);
private:
mutable boost::fibers::mutex state_mu_;
mutable util::fibers_ext::Mutex state_mu_;
std::atomic_bool lameduck_{false};
};

View file

@ -4,8 +4,6 @@
#pragma once
#include <boost/fiber/condition_variable.hpp>
#include <boost/fiber/fiber.hpp>
#include <optional>
#include <string_view>

View file

@ -20,7 +20,6 @@
using namespace testing;
using namespace std;
using namespace util;
namespace fibers = ::boost::fibers;
using absl::StrCat;
namespace dfly {
@ -112,7 +111,7 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
RespExpr resp0, resp1;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
resp0 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop0";
});
@ -151,7 +150,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
ASSERT_FALSE(IsLocked(0, kKey1));
ASSERT_FALSE(IsLocked(0, kKey2));
auto fb1 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto fb1 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
resp0 = Run({"blpop", kKey1, kKey2, "0"});
});
@ -204,7 +203,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
Run({"exists", kKey1, kKey2, kKey3});
ASSERT_EQ(3, GetDebugInfo().shards_count);
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
@ -243,7 +242,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
TEST_F(ListFamilyTest, BLPopSerialize) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
@ -313,7 +312,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
@ -340,7 +339,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
TEST_F(ListFamilyTest, BPopSameKeyTwice) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
EXPECT_EQ(0, NumWatched());
});
@ -353,7 +352,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});
@ -371,7 +370,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
ASSERT_EQ(1, GetDebugInfo().shards_count);
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", "x", "y", "0"});
EXPECT_FALSE(IsLocked(0, "y"));
ASSERT_EQ(0, NumWatched());
@ -392,7 +391,7 @@ TEST_F(ListFamilyTest, BPopRename) {
Run({"exists", kKey1, kKey2});
ASSERT_EQ(2, GetDebugInfo().shards_count);
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
@ -410,7 +409,7 @@ TEST_F(ListFamilyTest, BPopRename) {
TEST_F(ListFamilyTest, BPopFlush) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
@ -716,7 +715,7 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
RespExpr resp;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
resp = Run({"brpoplpush", "x", "y", "0"});
});
fibers_ext::SleepFor(30us);
@ -738,7 +737,7 @@ TEST_F(ListFamilyTest, BRPopContended) {
// Run the fiber at creation.
fibers_ext::Fiber fb[kNumFibers];
for (int i = 0; i < kNumFibers; i++) {
fb[i] = pp_->at(1)->LaunchFiber(fibers::launch::dispatch, [&] {
fb[i] = pp_->at(1)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
string id = StrCat("id", i);
while (!done) {
Run(id, {"brpop", "k0", "k1", "k2", "k3", "k4", "0.1"});
@ -773,7 +772,7 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
ASSERT_EQ(0, NumWatched());
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
resp = Run({"brpoplpush", "x", "z", "0"});
});

View file

@ -13,7 +13,6 @@ extern "C" {
#include <absl/strings/str_format.h>
#include <xxhash.h>
#include <boost/fiber/operations.hpp>
#include <filesystem>
#include "base/flags.h"

View file

@ -128,7 +128,7 @@ class Service : public facade::ServiceInterface {
CommandRegistry registry_;
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;
mutable ::boost::fibers::mutex mu_;
mutable util::fibers_ext::Mutex mu_;
GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_;
};

View file

@ -3,7 +3,6 @@
//
#pragma once
#include <boost/fiber/mutex.hpp>
#include <jsoncons/json.hpp>
#include <system_error>

View file

@ -115,11 +115,11 @@ Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* s
}
Replica::~Replica() {
if (sync_fb_.joinable()) {
sync_fb_.join();
if (sync_fb_.IsJoinable()) {
sync_fb_.Join();
}
if (execution_fb_.joinable()) {
execution_fb_.join();
if (execution_fb_.IsJoinable()) {
execution_fb_.Join();
}
if (sock_) {
@ -156,7 +156,7 @@ bool Replica::Start(ConnectionContext* cntx) {
cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this));
// 4. Spawn main coordination fiber.
sync_fb_ = ::boost::fibers::fiber(&Replica::MainReplicationFb, this);
sync_fb_ = fibers_ext::Fiber(&Replica::MainReplicationFb, this);
(*cntx)->SendOk();
return true;
@ -173,8 +173,8 @@ void Replica::Stop() {
// Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections).
if (sync_fb_.joinable())
sync_fb_.join();
if (sync_fb_.IsJoinable())
sync_fb_.Join();
}
void Replica::Pause(bool pause) {
@ -666,11 +666,11 @@ void Replica::CloseSocket() {
void Replica::JoinAllFlows() {
for (auto& flow : shard_flows_) {
if (flow->sync_fb_.joinable()) {
flow->sync_fb_.join();
if (flow->sync_fb_.IsJoinable()) {
flow->sync_fb_.Join();
}
if (flow->execution_fb_.joinable()) {
flow->execution_fb_.join();
if (flow->execution_fb_.IsJoinable()) {
flow->execution_fb_.Join();
}
}
}
@ -746,7 +746,7 @@ error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* c
// We can not discard io_buf because it may contain data
// besides the response we parsed. Therefore we pass it further to ReplicateDFFb.
sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx);
sync_fb_ = fibers_ext::Fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx);
return error_code{};
}
@ -759,9 +759,9 @@ error_code Replica::StartStableSyncFlow(Context* cntx) {
CHECK(sock_->IsOpen());
// sock_.reset(mythread->CreateSocket());
// RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyReadFb, this, cntx);
sync_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyReadFb, this, cntx);
if (use_multi_shard_exe_sync_) {
execution_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyExecFb, this, cntx);
execution_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyExecFb, this, cntx);
}
return std::error_code{};

View file

@ -4,8 +4,6 @@
#pragma once
#include <boost/fiber/barrier.hpp>
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/mutex.hpp>
#include <queue>
#include <variant>
@ -83,7 +81,7 @@ class Replica {
// Coorindator for multi shard execution.
struct MultiShardExecution {
boost::fibers::mutex map_mu;
util::fibers_ext::Mutex map_mu;
struct TxExecutionSync {
util::fibers_ext::Barrier barrier;
@ -234,13 +232,13 @@ class Replica {
std::atomic_uint64_t journal_rec_executed_ = 0;
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
::boost::fibers::fiber sync_fb_;
::boost::fibers::fiber execution_fb_;
::util::fibers_ext::Fiber sync_fb_;
::util::fibers_ext::Fiber execution_fb_;
std::vector<std::unique_ptr<Replica>> shard_flows_;
// Guard operations where flows might be in a mixed state (transition/setup)
::boost::fibers::mutex flows_op_mu_;
util::fibers_ext::Mutex flows_op_mu_;
std::optional<base::IoBuf> leftover_buf_;
std::unique_ptr<facade::RedisParser> parser_;

View file

@ -32,6 +32,7 @@ namespace dfly {
using namespace std;
using namespace facade;
using namespace util;
ScriptMgr::ScriptMgr() {
// Build default script config
@ -155,7 +156,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const {
void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
absl::flat_hash_map<std::string, base::Histogram> result;
boost::fibers::mutex mu;
fibers_ext::Mutex mu;
shard_set->pool()->AwaitFiberOnAll([&](auto* pb) {
auto* ss = ServerState::tlocal();

View file

@ -7,7 +7,6 @@
#include <absl/container/flat_hash_map.h>
#include <array>
#include <boost/fiber/mutex.hpp>
#include <optional>
#include "server/conn_context.h"
@ -71,7 +70,7 @@ class ScriptMgr {
ScriptParams default_params_;
absl::flat_hash_map<ScriptKey, InternalScriptData> db_;
mutable ::boost::fibers::mutex mu_;
mutable util::fibers_ext::Mutex mu_;
};
} // namespace dfly

View file

@ -69,17 +69,16 @@ ABSL_DECLARE_FLAG(uint32_t, hz);
namespace dfly {
namespace fibers = ::boost::fibers;
namespace fs = std::filesystem;
namespace uring = util::uring;
using absl::GetFlag;
using absl::StrCat;
using namespace facade;
using namespace util;
using fibers_ext::FiberQueueThreadPool;
using http::StringResponse;
using strings::HumanReadableNumBytes;
using util::ProactorBase;
using util::fibers_ext::FiberQueueThreadPool;
using util::http::StringResponse;
namespace {
@ -473,7 +472,7 @@ void ServerFamily::Shutdown() {
// Load starts as many fibers as there are files to load each one separately.
// It starts one more fiber that waits for all load fibers to finish and returns the first
// error (if any occured) with a future.
fibers::future<std::error_code> ServerFamily::Load(const std::string& load_path) {
fibers_ext::Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
CHECK(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"));
vector<std::string> paths{{load_path}};
@ -484,7 +483,7 @@ fibers::future<std::error_code> ServerFamily::Load(const std::string& load_path)
io::Result<io::StatShortVec> files = io::StatFiles(glob);
if (files && files->size() == 0) {
fibers::promise<std::error_code> ec_promise;
fibers_ext::Promise<std::error_code> ec_promise;
ec_promise.set_value(make_error_code(errc::no_such_file_or_directory));
return ec_promise.get_future();
}
@ -500,7 +499,7 @@ fibers::future<std::error_code> ServerFamily::Load(const std::string& load_path)
(void)fs::canonical(path, ec);
if (ec) {
LOG(ERROR) << "Error loading " << load_path << " " << ec.message();
fibers::promise<std::error_code> ec_promise;
fibers_ext::Promise<std::error_code> ec_promise;
ec_promise.set_value(ec);
return ec_promise.get_future();
}
@ -916,7 +915,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
vector<unique_ptr<RdbSnapshot>> snapshots;
absl::flat_hash_map<string_view, size_t> rdb_name_map;
fibers::mutex mu; // guards rdb_name_map
fibers_ext::Mutex mu; // guards rdb_name_map
auto save_cb = [&](unsigned index) {
auto& snapshot = snapshots[index];
@ -1153,7 +1152,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
if (sub_cmd == "LIST") {
vector<string> client_info;
fibers::mutex mu;
fibers_ext::Mutex mu;
auto cb = [&](util::Connection* conn) {
facade::Connection* dcon = static_cast<facade::Connection*>(conn);
string info = dcon->GetClientInfo();
@ -1390,13 +1389,13 @@ static void MergeInto(const DbSlice::Stats& src, Metrics* dest) {
Metrics ServerFamily::GetMetrics() const {
Metrics result;
fibers::mutex mu;
fibers_ext::Mutex mu;
auto cb = [&](ProactorBase* pb) {
EngineShard* shard = EngineShard::tlocal();
ServerState* ss = ServerState::tlocal();
lock_guard<fibers::mutex> lk(mu);
lock_guard lk(mu);
result.uptime = time(NULL) - this->start_time_;
result.conn_stats += ss->connection_stats;

View file

@ -4,7 +4,6 @@
#pragma once
#include <boost/fiber/future.hpp>
#include <string>
#include "facade/conn_context.h"
@ -102,7 +101,7 @@ class ServerFamily {
// Load snapshot from file (.rdb file or summary.dfs file) and return
// future with error_code.
boost::fibers::future<std::error_code> Load(const std::string& file_name);
util::fibers_ext::Future<std::error_code> Load(const std::string& file_name);
// used within tests.
bool IsSaving() const {
@ -168,7 +167,7 @@ class ServerFamily {
void SnapshotScheduling(const SnapshotSpec& time);
util::fibers_ext::Fiber snapshot_fiber_;
boost::fibers::future<std::error_code> load_result_;
util::fibers_ext::Future<std::error_code> load_result_;
uint32_t stats_caching_task_ = 0;
Service& service_;
@ -177,7 +176,7 @@ class ServerFamily {
util::ListenerInterface* main_listener_ = nullptr;
util::ProactorBase* pb_task_ = nullptr;
mutable ::boost::fibers::mutex replicaof_mu_, save_mu_;
mutable util::fibers_ext::Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
std::unique_ptr<ScriptMgr> script_mgr_;

View file

@ -26,8 +26,6 @@ namespace dfly {
using namespace std;
using namespace util;
using namespace chrono_literals;
namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber;
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
@ -39,7 +37,7 @@ SliceSnapshot::~SliceSnapshot() {
}
void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
DCHECK(!snapshot_fb_.joinable());
DCHECK(!snapshot_fb_.IsJoinable());
auto db_cb = absl::bind_front(&SliceSnapshot::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(move(db_cb));
@ -55,7 +53,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
snapshot_fb_ = fiber([this, stream_journal, cll] {
snapshot_fb_ = MakeFiber([this, stream_journal, cll] {
IterateBucketsFb(cll);
if (cll->IsCancelled()) {
Cancel();
@ -90,8 +88,8 @@ void SliceSnapshot::Cancel() {
void SliceSnapshot::Join() {
// Fiber could have already been joined by Stop.
if (snapshot_fb_.joinable())
snapshot_fb_.join();
if (snapshot_fb_.IsJoinable())
snapshot_fb_.Join();
}
// The algorithm is to go over all the buckets and serialize those with
@ -138,7 +136,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
PushSerializedToChannel(false);
if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
DVLOG(2) << "Before sleep " << ThisFiber::GetName();
fibers_ext::Yield();
DVLOG(2) << "After sleep";
@ -149,7 +147,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
}
} while (cursor);
DVLOG(2) << "after loop " << this_fiber::properties<FiberProps>().name();
DVLOG(2) << "after loop " << ThisFiber::GetName();
PushSerializedToChannel(true);
} // for (dbindex)

View file

@ -130,8 +130,8 @@ class SliceSnapshot {
std::unique_ptr<RdbSerializer> serializer_;
::boost::fibers::mutex mu_;
::boost::fibers::fiber snapshot_fb_; // IterateEntriesFb
util::fibers_ext::Mutex mu_;
::util::fibers_ext::Fiber snapshot_fb_; // IterateEntriesFb
CompressionMode compression_mode_;
RdbTypeFreqMap type_freq_map_;

View file

@ -95,7 +95,7 @@ class BaseFamilyTest : public ::testing::Test {
unsigned num_threads_ = 3;
absl::flat_hash_map<std::string, std::unique_ptr<TestConnWrapper>> connections_;
::boost::fibers::mutex mu_;
util::fibers_ext::Mutex mu_;
ConnectionContext::DebugInfo last_cmd_dbg_info_;
std::vector<RespVec*> resp_vec_;