diff --git a/helio b/helio index 124e97a30..245f9af5e 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 124e97a30e854e0305277e8d80b1b575612181d0 +Subproject commit 245f9af5e738f3c571414755921cd910222c0d67 diff --git a/src/core/interpreter.h b/src/core/interpreter.h index f259efbcc..08cb83761 100644 --- a/src/core/interpreter.h +++ b/src/core/interpreter.h @@ -4,7 +4,6 @@ #pragma once -#include #include #include diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 22d8179ac..954b8752f 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -8,8 +8,6 @@ #include #include -#include - #include "base/flags.h" #include "base/logging.h" #include "facade/conn_context.h" @@ -44,8 +42,6 @@ using namespace util; using namespace std; using nonstd::make_unexpected; -namespace fibers = boost::fibers; - namespace facade { namespace { @@ -552,7 +548,7 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { void Connection::ConnectionFlow(FiberSocketBase* peer) { stats_ = service_->GetThreadLocalConnectionStats(); - auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); }); + auto dispatch_fb = MakeFiber(fibers_ext::Launch::dispatch, [&] { DispatchFiber(peer); }); ++stats_->num_conns; ++stats_->conn_received_cnt; @@ -589,7 +585,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { cc_->conn_closing = true; // Signal dispatch to close. evc_.notify(); VLOG(1) << "Before dispatch_fb.join()"; - dispatch_fb.join(); + dispatch_fb.Join(); VLOG(1) << "After dispatch_fb.join()"; service_->OnClose(cc_.get()); diff --git a/src/server/common.h b/src/server/common.h index 58b02377f..5f80d5f5a 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -9,7 +9,6 @@ #include #include -#include #include #include @@ -216,7 +215,7 @@ template struct AggregateValue { } private: - ::boost::fibers::mutex mu_{}; + util::fibers_ext::Mutex mu_{}; T current_{}; }; @@ -320,7 +319,7 @@ class Context : protected Cancellation { private: GenericError err_; - ::boost::fibers::mutex mu_; + util::fibers_ext::Mutex mu_; ErrHandler err_handler_; ::util::fibers_ext::Fiber err_handler_fb_; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 570b982ca..1863d8151 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -8,15 +8,13 @@ extern "C" { #include "redis/object.h" } -#include -#include - #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/journal/journal.h" #include "server/server_state.h" #include "server/tiered_storage.h" #include "util/fiber_sched_algo.h" +#include "util/fibers/fiber.h" #include "util/proactor_base.h" namespace dfly { @@ -506,7 +504,7 @@ void DbSlice::FlushDb(DbIndex db_ind) { mi_heap_collect(ServerState::tlocal()->data_heap(), true); }; - boost::fibers::fiber(std::move(cb)).detach(); + util::MakeFiber(std::move(cb)).Detach(); return; } @@ -526,11 +524,11 @@ void DbSlice::FlushDb(DbIndex db_ind) { } } - boost::fibers::fiber([all_dbs = std::move(all_dbs)]() mutable { + MakeFiber([all_dbs = std::move(all_dbs)]() mutable { for (auto& db : all_dbs) { db.reset(); } - }).detach(); + }).Detach(); } void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) { diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index acbbac250..0ced165f2 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -7,7 +7,6 @@ #include #include -#include #include #include "base/flags.h" @@ -33,7 +32,6 @@ namespace dfly { using namespace util; using boost::intrusive_ptr; -using boost::fibers::fiber; using namespace facade; namespace fs = std::filesystem; using absl::GetFlag; @@ -419,7 +417,7 @@ void DebugCmd::Inspect(string_view key) { } void DebugCmd::Watched() { - boost::fibers::mutex mu; + util::fibers_ext::Mutex mu; vector watched_keys; vector awaked_trans; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index d08826605..ca0b64b51 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -19,7 +19,6 @@ #include "server/server_family.h" #include "server/server_state.h" #include "server/transaction.h" - using namespace std; ABSL_DECLARE_FLAG(string, dir); @@ -27,7 +26,8 @@ ABSL_DECLARE_FLAG(string, dir); namespace dfly { using namespace facade; -using namespace std; +using namespace util; + using util::ProactorBase; namespace { @@ -383,7 +383,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); } - flow->full_sync_fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow, cntx); + flow->full_sync_fb = fibers_ext::Fiber(&DflyCmd::FullSyncFb, this, flow, cntx); return OpStatus::OK; } @@ -465,7 +465,7 @@ uint32_t DflyCmd::CreateSyncSession(ConnectionContext* cntx) { // Spawn external fiber to allow destructing the context from outside // and return from the handler immediately. - ::boost::fibers::fiber{&DflyCmd::StopReplication, this, sync_id}.detach(); + util::MakeFiber(&DflyCmd::StopReplication, this, sync_id).Detach(); }; string address = cntx->owner()->RemoteEndpointAddress(); diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index cd1e8eecc..deb225511 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -7,8 +7,6 @@ #include #include -#include -#include #include #include "server/conn_context.h" @@ -110,7 +108,7 @@ class DflyCmd { uint32_t listening_port; std::vector flows; - ::boost::fibers::mutex mu; // See top of header for locking levels. + util::fibers_ext::Mutex mu; // See top of header for locking levels. }; struct ReplicaRoleInfo { @@ -206,7 +204,7 @@ class DflyCmd { using ReplicaInfoMap = absl::btree_map>; ReplicaInfoMap replica_infos_; - ::boost::fibers::mutex mu_; // Guard global operations. See header top for locking levels. + util::fibers_ext::Mutex mu_; // Guard global operations. See header top for locking levels. }; } // namespace dfly diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index a2479420a..3412051d9 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -44,7 +44,6 @@ ABSL_FLAG(float, mem_defrag_page_utilization_threshold, 0.8, namespace dfly { using namespace util; -namespace fibers = ::boost::fibers; using absl::GetFlag; namespace { @@ -198,8 +197,8 @@ uint32_t EngineShard::DefragTask() { EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { - fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { - FiberProps::SetName(absl::StrCat("shard_queue", index)); + fiber_q_ = MakeFiber([this, index = pb->GetIndex()] { + ThisFiber::SetName(absl::StrCat("shard_queue", index)); queue_.Run(); }); @@ -208,8 +207,8 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* if (clock_cycle_ms == 0) clock_cycle_ms = 1; - fiber_periodic_ = fibers::fiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] { - FiberProps::SetName(absl::StrCat("shard_periodic", index)); + fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] { + ThisFiber::SetName(absl::StrCat("shard_periodic", index)); RunPeriodic(std::chrono::milliseconds(period_ms)); }); } @@ -227,15 +226,15 @@ EngineShard::~EngineShard() { void EngineShard::Shutdown() { queue_.Shutdown(); - fiber_q_.join(); + fiber_q_.Join(); if (tiered_storage_) { tiered_storage_->Shutdown(); } fiber_periodic_done_.Notify(); - if (fiber_periodic_.joinable()) { - fiber_periodic_.join(); + if (fiber_periodic_.IsJoinable()) { + fiber_periodic_.Join(); } ProactorBase::me()->RemoveOnIdleTask(defrag_task_); @@ -532,7 +531,7 @@ BlockingController* EngineShard::EnsureBlockingController() { } void EngineShard::TEST_EnableHeartbeat() { - fiber_periodic_ = fibers::fiber([this, period_ms = 1] { + fiber_periodic_ = MakeFiber([this, period_ms = 1] { FiberProps::SetName("shard_periodic_TEST"); RunPeriodic(std::chrono::milliseconds(period_ms)); }); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 1015018c4..be54d1ec1 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -189,7 +189,7 @@ class EngineShard { bool DoDefrag(); ::util::fibers_ext::FiberQueue queue_; - ::boost::fibers::fiber fiber_q_; + util::fibers_ext::Fiber fiber_q_; TxQueue txq_; MiMemoryResource mi_resource_; @@ -207,7 +207,7 @@ class EngineShard { IntentLock shard_lock_; uint32_t defrag_task_ = 0; - ::boost::fibers::fiber fiber_periodic_; + ::util::fibers_ext::Fiber fiber_periodic_; ::util::fibers_ext::Done fiber_periodic_done_; DefragTaskState defrag_state_; diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 2f038cf55..d7cd24699 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -276,7 +276,7 @@ TEST_F(GenericFamilyTest, Move) { ASSERT_THAT(Run({"get", "a"}), "test"); // Check MOVE awakes blocking operations - auto fb_blpop = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto fb_blpop = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { Run({"select", "1"}); auto resp = Run({"blpop", "l", "0"}); ASSERT_THAT(resp, ArgType(RespExpr::ARRAY)); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 37f12961e..77ae187a0 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -55,7 +55,7 @@ error_code Journal::Close() { VLOG(1) << "Journal::Close"; - fibers::mutex ec_mu; + fibers_ext::Mutex ec_mu; error_code res; lock_guard lk(state_mu_); diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index cdcfe795e..d7ef7634a 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -53,7 +53,7 @@ class Journal { bool await); private: - mutable boost::fibers::mutex state_mu_; + mutable util::fibers_ext::Mutex state_mu_; std::atomic_bool lameduck_{false}; }; diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index cff61c5de..8c655a8ef 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -4,8 +4,6 @@ #pragma once -#include -#include #include #include diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index b757ee50e..9038a0cdc 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -20,7 +20,6 @@ using namespace testing; using namespace std; using namespace util; -namespace fibers = ::boost::fibers; using absl::StrCat; namespace dfly { @@ -112,7 +111,7 @@ TEST_F(ListFamilyTest, BLPopBlocking) { RespExpr resp0, resp1; // Run the fiber at creation. - auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { resp0 = Run({"blpop", "x", "0"}); LOG(INFO) << "pop0"; }); @@ -151,7 +150,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) { ASSERT_FALSE(IsLocked(0, kKey1)); ASSERT_FALSE(IsLocked(0, kKey2)); - auto fb1 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto fb1 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { resp0 = Run({"blpop", kKey1, kKey2, "0"}); }); @@ -204,7 +203,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { Run({"exists", kKey1, kKey2, kKey3}); ASSERT_EQ(3, GetDebugInfo().shards_count); RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); @@ -243,7 +242,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { TEST_F(ListFamilyTest, BLPopSerialize) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); @@ -313,7 +312,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) { TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, "0"}); }); @@ -340,7 +339,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { TEST_F(ListFamilyTest, BPopSameKeyTwice) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); EXPECT_EQ(0, NumWatched()); }); @@ -353,7 +352,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); - pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); }); @@ -371,7 +370,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) { ASSERT_EQ(1, GetDebugInfo().shards_count); RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", "x", "y", "0"}); EXPECT_FALSE(IsLocked(0, "y")); ASSERT_EQ(0, NumWatched()); @@ -392,7 +391,7 @@ TEST_F(ListFamilyTest, BPopRename) { Run({"exists", kKey1, kKey2}); ASSERT_EQ(2, GetDebugInfo().shards_count); - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, "0"}); }); @@ -410,7 +409,7 @@ TEST_F(ListFamilyTest, BPopRename) { TEST_F(ListFamilyTest, BPopFlush) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, "0"}); }); @@ -716,7 +715,7 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { RespExpr resp; // Run the fiber at creation. - auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { resp = Run({"brpoplpush", "x", "y", "0"}); }); fibers_ext::SleepFor(30us); @@ -738,7 +737,7 @@ TEST_F(ListFamilyTest, BRPopContended) { // Run the fiber at creation. fibers_ext::Fiber fb[kNumFibers]; for (int i = 0; i < kNumFibers; i++) { - fb[i] = pp_->at(1)->LaunchFiber(fibers::launch::dispatch, [&] { + fb[i] = pp_->at(1)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { string id = StrCat("id", i); while (!done) { Run(id, {"brpop", "k0", "k1", "k2", "k3", "k4", "0.1"}); @@ -773,7 +772,7 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) { ASSERT_EQ(0, NumWatched()); // Run the fiber at creation. - auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { resp = Run({"brpoplpush", "x", "z", "0"}); }); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index a04d55962..579086e72 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -13,7 +13,6 @@ extern "C" { #include #include -#include #include #include "base/flags.h" diff --git a/src/server/main_service.h b/src/server/main_service.h index f90fdb04c..ec5389f6d 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -128,7 +128,7 @@ class Service : public facade::ServiceInterface { CommandRegistry registry_; absl::flat_hash_map unknown_cmds_; - mutable ::boost::fibers::mutex mu_; + mutable util::fibers_ext::Mutex mu_; GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_; }; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 9f093d935..bf0b8f8f1 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -3,7 +3,6 @@ // #pragma once -#include #include #include diff --git a/src/server/replica.cc b/src/server/replica.cc index 1a303fafc..ca58b6aef 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -115,11 +115,11 @@ Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* s } Replica::~Replica() { - if (sync_fb_.joinable()) { - sync_fb_.join(); + if (sync_fb_.IsJoinable()) { + sync_fb_.Join(); } - if (execution_fb_.joinable()) { - execution_fb_.join(); + if (execution_fb_.IsJoinable()) { + execution_fb_.Join(); } if (sock_) { @@ -156,7 +156,7 @@ bool Replica::Start(ConnectionContext* cntx) { cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this)); // 4. Spawn main coordination fiber. - sync_fb_ = ::boost::fibers::fiber(&Replica::MainReplicationFb, this); + sync_fb_ = fibers_ext::Fiber(&Replica::MainReplicationFb, this); (*cntx)->SendOk(); return true; @@ -173,8 +173,8 @@ void Replica::Stop() { // Make sure the replica fully stopped and did all cleanup, // so we can freely release resources (connections). - if (sync_fb_.joinable()) - sync_fb_.join(); + if (sync_fb_.IsJoinable()) + sync_fb_.Join(); } void Replica::Pause(bool pause) { @@ -666,11 +666,11 @@ void Replica::CloseSocket() { void Replica::JoinAllFlows() { for (auto& flow : shard_flows_) { - if (flow->sync_fb_.joinable()) { - flow->sync_fb_.join(); + if (flow->sync_fb_.IsJoinable()) { + flow->sync_fb_.Join(); } - if (flow->execution_fb_.joinable()) { - flow->execution_fb_.join(); + if (flow->execution_fb_.IsJoinable()) { + flow->execution_fb_.Join(); } } } @@ -746,7 +746,7 @@ error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* c // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx); + sync_fb_ = fibers_ext::Fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx); return error_code{}; } @@ -759,9 +759,9 @@ error_code Replica::StartStableSyncFlow(Context* cntx) { CHECK(sock_->IsOpen()); // sock_.reset(mythread->CreateSocket()); // RETURN_ON_ERR(sock_->Connect(master_context_.master_ep)); - sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyReadFb, this, cntx); + sync_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyReadFb, this, cntx); if (use_multi_shard_exe_sync_) { - execution_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyExecFb, this, cntx); + execution_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyExecFb, this, cntx); } return std::error_code{}; diff --git a/src/server/replica.h b/src/server/replica.h index 064bea86f..64096ed6b 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -4,8 +4,6 @@ #pragma once #include -#include -#include #include #include @@ -83,7 +81,7 @@ class Replica { // Coorindator for multi shard execution. struct MultiShardExecution { - boost::fibers::mutex map_mu; + util::fibers_ext::Mutex map_mu; struct TxExecutionSync { util::fibers_ext::Barrier barrier; @@ -234,13 +232,13 @@ class Replica { std::atomic_uint64_t journal_rec_executed_ = 0; // MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode. - ::boost::fibers::fiber sync_fb_; - ::boost::fibers::fiber execution_fb_; + ::util::fibers_ext::Fiber sync_fb_; + ::util::fibers_ext::Fiber execution_fb_; std::vector> shard_flows_; // Guard operations where flows might be in a mixed state (transition/setup) - ::boost::fibers::mutex flows_op_mu_; + util::fibers_ext::Mutex flows_op_mu_; std::optional leftover_buf_; std::unique_ptr parser_; diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index 33db5150b..216a5d9f7 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -32,6 +32,7 @@ namespace dfly { using namespace std; using namespace facade; +using namespace util; ScriptMgr::ScriptMgr() { // Build default script config @@ -155,7 +156,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const { void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const { absl::flat_hash_map result; - boost::fibers::mutex mu; + fibers_ext::Mutex mu; shard_set->pool()->AwaitFiberOnAll([&](auto* pb) { auto* ss = ServerState::tlocal(); diff --git a/src/server/script_mgr.h b/src/server/script_mgr.h index 530b60e5a..091175ea1 100644 --- a/src/server/script_mgr.h +++ b/src/server/script_mgr.h @@ -7,7 +7,6 @@ #include #include -#include #include #include "server/conn_context.h" @@ -71,7 +70,7 @@ class ScriptMgr { ScriptParams default_params_; absl::flat_hash_map db_; - mutable ::boost::fibers::mutex mu_; + mutable util::fibers_ext::Mutex mu_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index ead35722d..e348c3528 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -69,17 +69,16 @@ ABSL_DECLARE_FLAG(uint32_t, hz); namespace dfly { -namespace fibers = ::boost::fibers; namespace fs = std::filesystem; namespace uring = util::uring; using absl::GetFlag; using absl::StrCat; using namespace facade; +using namespace util; +using fibers_ext::FiberQueueThreadPool; +using http::StringResponse; using strings::HumanReadableNumBytes; -using util::ProactorBase; -using util::fibers_ext::FiberQueueThreadPool; -using util::http::StringResponse; namespace { @@ -473,7 +472,7 @@ void ServerFamily::Shutdown() { // Load starts as many fibers as there are files to load each one separately. // It starts one more fiber that waits for all load fibers to finish and returns the first // error (if any occured) with a future. -fibers::future ServerFamily::Load(const std::string& load_path) { +fibers_ext::Future ServerFamily::Load(const std::string& load_path) { CHECK(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs")); vector paths{{load_path}}; @@ -484,7 +483,7 @@ fibers::future ServerFamily::Load(const std::string& load_path) io::Result files = io::StatFiles(glob); if (files && files->size() == 0) { - fibers::promise ec_promise; + fibers_ext::Promise ec_promise; ec_promise.set_value(make_error_code(errc::no_such_file_or_directory)); return ec_promise.get_future(); } @@ -500,7 +499,7 @@ fibers::future ServerFamily::Load(const std::string& load_path) (void)fs::canonical(path, ec); if (ec) { LOG(ERROR) << "Error loading " << load_path << " " << ec.message(); - fibers::promise ec_promise; + fibers_ext::Promise ec_promise; ec_promise.set_value(ec); return ec_promise.get_future(); } @@ -916,7 +915,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) { vector> snapshots; absl::flat_hash_map rdb_name_map; - fibers::mutex mu; // guards rdb_name_map + fibers_ext::Mutex mu; // guards rdb_name_map auto save_cb = [&](unsigned index) { auto& snapshot = snapshots[index]; @@ -1153,7 +1152,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { if (sub_cmd == "LIST") { vector client_info; - fibers::mutex mu; + fibers_ext::Mutex mu; auto cb = [&](util::Connection* conn) { facade::Connection* dcon = static_cast(conn); string info = dcon->GetClientInfo(); @@ -1390,13 +1389,13 @@ static void MergeInto(const DbSlice::Stats& src, Metrics* dest) { Metrics ServerFamily::GetMetrics() const { Metrics result; - fibers::mutex mu; + fibers_ext::Mutex mu; auto cb = [&](ProactorBase* pb) { EngineShard* shard = EngineShard::tlocal(); ServerState* ss = ServerState::tlocal(); - lock_guard lk(mu); + lock_guard lk(mu); result.uptime = time(NULL) - this->start_time_; result.conn_stats += ss->connection_stats; diff --git a/src/server/server_family.h b/src/server/server_family.h index 72dddc32d..3163c8a11 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -4,7 +4,6 @@ #pragma once -#include #include #include "facade/conn_context.h" @@ -102,7 +101,7 @@ class ServerFamily { // Load snapshot from file (.rdb file or summary.dfs file) and return // future with error_code. - boost::fibers::future Load(const std::string& file_name); + util::fibers_ext::Future Load(const std::string& file_name); // used within tests. bool IsSaving() const { @@ -168,7 +167,7 @@ class ServerFamily { void SnapshotScheduling(const SnapshotSpec& time); util::fibers_ext::Fiber snapshot_fiber_; - boost::fibers::future load_result_; + util::fibers_ext::Future load_result_; uint32_t stats_caching_task_ = 0; Service& service_; @@ -177,7 +176,7 @@ class ServerFamily { util::ListenerInterface* main_listener_ = nullptr; util::ProactorBase* pb_task_ = nullptr; - mutable ::boost::fibers::mutex replicaof_mu_, save_mu_; + mutable util::fibers_ext::Mutex replicaof_mu_, save_mu_; std::shared_ptr replica_; // protected by replica_of_mu_ std::unique_ptr script_mgr_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 2d80024ef..1ca4dc379 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -26,8 +26,6 @@ namespace dfly { using namespace std; using namespace util; using namespace chrono_literals; -namespace this_fiber = ::boost::this_fiber; -using boost::fibers::fiber; SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode) @@ -39,7 +37,7 @@ SliceSnapshot::~SliceSnapshot() { } void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { - DCHECK(!snapshot_fb_.joinable()); + DCHECK(!snapshot_fb_.IsJoinable()); auto db_cb = absl::bind_front(&SliceSnapshot::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(move(db_cb)); @@ -55,7 +53,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; - snapshot_fb_ = fiber([this, stream_journal, cll] { + snapshot_fb_ = MakeFiber([this, stream_journal, cll] { IterateBucketsFb(cll); if (cll->IsCancelled()) { Cancel(); @@ -90,8 +88,8 @@ void SliceSnapshot::Cancel() { void SliceSnapshot::Join() { // Fiber could have already been joined by Stop. - if (snapshot_fb_.joinable()) - snapshot_fb_.join(); + if (snapshot_fb_.IsJoinable()) + snapshot_fb_.Join(); } // The algorithm is to go over all the buckets and serialize those with @@ -138,7 +136,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { PushSerializedToChannel(false); if (stats_.loop_serialized >= last_yield + 100) { - DVLOG(2) << "Before sleep " << this_fiber::properties().name(); + DVLOG(2) << "Before sleep " << ThisFiber::GetName(); fibers_ext::Yield(); DVLOG(2) << "After sleep"; @@ -149,7 +147,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { } } while (cursor); - DVLOG(2) << "after loop " << this_fiber::properties().name(); + DVLOG(2) << "after loop " << ThisFiber::GetName(); PushSerializedToChannel(true); } // for (dbindex) diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 2875464de..810e8261f 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -130,8 +130,8 @@ class SliceSnapshot { std::unique_ptr serializer_; - ::boost::fibers::mutex mu_; - ::boost::fibers::fiber snapshot_fb_; // IterateEntriesFb + util::fibers_ext::Mutex mu_; + ::util::fibers_ext::Fiber snapshot_fb_; // IterateEntriesFb CompressionMode compression_mode_; RdbTypeFreqMap type_freq_map_; diff --git a/src/server/test_utils.h b/src/server/test_utils.h index e2634db15..ae00ebeeb 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -95,7 +95,7 @@ class BaseFamilyTest : public ::testing::Test { unsigned num_threads_ = 3; absl::flat_hash_map> connections_; - ::boost::fibers::mutex mu_; + util::fibers_ext::Mutex mu_; ConnectionContext::DebugInfo last_cmd_dbg_info_; std::vector resp_vec_;