diff --git a/src/core/fibers.h b/src/core/fibers.h deleted file mode 100644 index 71adc2b36..000000000 --- a/src/core/fibers.h +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright 2023, Roman Gershman. All rights reserved. -// See LICENSE for licensing terms. -// -#pragma once - -// An import header that centralizes all the imports from helio project regarding fibers - -#include "util/fibers/fiber2.h" -#include "util/fibers/fiberqueue_threadpool.h" -#include "util/fibers/future.h" -#include "util/fibers/simple_channel.h" - -namespace dfly { - -using util::fb2::Fiber; -using util::fb2::Launch; -using util::fb2::Mutex; -using util::fb2::SimpleChannel; - -} // namespace dfly diff --git a/src/core/size_tracking_channel.h b/src/core/size_tracking_channel.h index 6e7267413..df7f442fd 100644 --- a/src/core/size_tracking_channel.h +++ b/src/core/size_tracking_channel.h @@ -6,7 +6,7 @@ #include -#include "core/fibers.h" +#include "util/fibers/simple_channel.h" namespace dfly { @@ -68,7 +68,7 @@ template > class Si } private: - SimpleChannel queue_; + util::fb2::SimpleChannel queue_; std::atomic size_ = 0; }; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index fc1e5f9ae..95aed5ad6 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1468,7 +1468,7 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) { void Connection::LaunchDispatchFiberIfNeeded() { if (!dispatch_fb_.IsJoinable()) { - dispatch_fb_ = fb2::Fiber(dfly::Launch::post, "connection_dispatch", + dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch", [&, peer = socket_.get()]() { DispatchFiber(peer); }); } } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 8c10ab95a..a8f2e3d03 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -15,12 +15,12 @@ #include #include "base/io_buf.h" -#include "core/fibers.h" #include "facade/acl_commands_def.h" #include "facade/facade_types.h" #include "facade/memcache_parser.h" #include "facade/resp_expr.h" #include "util/connection.h" +#include "util/fibers/fibers.h" #include "util/http/http_handler.h" typedef struct ssl_ctx_st SSL_CTX; diff --git a/src/server/acl/acl_family.cc b/src/server/acl/acl_family.cc index 6461b120d..00605d3d3 100644 --- a/src/server/acl/acl_family.cc +++ b/src/server/acl/acl_family.cc @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include diff --git a/src/server/acl/user_registry.cc b/src/server/acl/user_registry.cc index 653c379e5..4a7e38e18 100644 --- a/src/server/acl/user_registry.cc +++ b/src/server/acl/user_registry.cc @@ -8,7 +8,6 @@ #include #include "base/flags.h" -#include "core/fibers.h" #include "facade/facade_types.h" #include "server/acl/acl_commands_def.h" diff --git a/src/server/channel_store.h b/src/server/channel_store.h index f9bc29d58..7f09d4b50 100644 --- a/src/server/channel_store.h +++ b/src/server/channel_store.h @@ -100,7 +100,7 @@ class ChannelStore { // Centralized controller to prevent overlaping updates. struct ControlBlock { std::atomic most_recent; - Mutex update_mu; // locked during updates. + util::fb2::Mutex update_mu; // locked during updates. }; private: diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index b3149f423..584dda9ec 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -36,6 +36,8 @@ namespace { using namespace std; using namespace facade; +using namespace util; + using CI = CommandId; using ClusterShard = ClusterConfig::ClusterShard; using ClusterShards = ClusterConfig::ClusterShards; @@ -423,7 +425,7 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) { namespace { // Guards set configuration, so that we won't handle 2 in parallel. -Mutex set_config_mu; +util::fb2::Mutex set_config_mu; void DeleteSlots(const SlotSet& slots) { if (slots.Empty()) { @@ -557,7 +559,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c if (auto err = parser.Error(); err) return rb->SendError(err->MakeReply()); - Mutex mu; + fb2::Mutex mu; auto cb = [&](auto*) { EngineShard* shard = EngineShard::tlocal(); diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 8a4391361..ca4c0dc0e 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -84,7 +84,7 @@ class ClusterFamily { std::shared_ptr GetOutgoingMigration(uint32_t sync_id); - mutable Mutex migration_mu_; // guard migrations operations + mutable util::fb2::Mutex migration_mu_; // guard migrations operations // holds all incoming slots migrations that are currently in progress. std::vector> incoming_migrations_jobs_ ABSL_GUARDED_BY(migration_mu_); diff --git a/src/server/cluster/cluster_shard_migration.h b/src/server/cluster/cluster_shard_migration.h index 746b34fdf..ee74fedd2 100644 --- a/src/server/cluster/cluster_shard_migration.h +++ b/src/server/cluster/cluster_shard_migration.h @@ -48,7 +48,7 @@ class ClusterShardMigration : public ProtocolClient { uint32_t sync_id_; std::optional leftover_buf_; std::unique_ptr executor_; - Fiber sync_fb_; + util::fb2::Fiber sync_fb_; std::atomic_bool is_stable_sync_ = false; bool is_finalized_ = false; }; diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index d56f2d329..3cf27697b 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -61,7 +61,7 @@ class ClusterSlotMigration : private ProtocolClient { private: ClusterFamily* cluster_family_; Service& service_; - Mutex flows_op_mu_; + util::fb2::Mutex flows_op_mu_; std::vector> shard_flows_; SlotRanges slots_; uint32_t source_shards_num_ = 0; @@ -69,7 +69,7 @@ class ClusterSlotMigration : private ProtocolClient { uint32_t local_sync_id_ = 0; MigrationState state_ = MigrationState::C_NO_STATE; - Fiber sync_fb_; + util::fb2::Fiber sync_fb_; }; } // namespace dfly diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 04d14dfcf..4922cf957 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -14,6 +14,8 @@ #include "server/server_family.h" using namespace std; +using namespace util; + namespace dfly { class OutgoingMigration::SliceSlotMigration { @@ -22,7 +24,7 @@ class OutgoingMigration::SliceSlotMigration { Context* cntx, io::Sink* dest) : streamer_(slice, std::move(slots), sync_id, journal, cntx) { state_.store(MigrationState::C_SYNC, memory_order_relaxed); - sync_fb_ = Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); }); + sync_fb_ = fb2::Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); }); } void Cancel() { @@ -46,7 +48,7 @@ class OutgoingMigration::SliceSlotMigration { RestoreStreamer streamer_; // Atomic only for simple read operation, writes - from the same thread, reads - from any thread atomic state_ = MigrationState::C_CONNECTING; - Fiber sync_fb_; + fb2::Fiber sync_fb_; }; OutgoingMigration::OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, @@ -78,7 +80,7 @@ void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, i } if (state == MigrationState::C_SYNC) { - main_sync_fb_ = Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this); + main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this); } } diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index d9ae026b2..031c3db60 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -56,11 +56,11 @@ class OutgoingMigration { uint16_t port_; SlotRanges slots_; Context cntx_; - mutable Mutex flows_mu_; + mutable util::fb2::Mutex flows_mu_; std::vector> slot_migrations_ ABSL_GUARDED_BY(flows_mu_); ServerFamily* server_family_; - Fiber main_sync_fb_; + util::fb2::Fiber main_sync_fb_; }; } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 15b7fc609..1aa3f0a2c 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -14,9 +14,10 @@ #include #include -#include "core/fibers.h" #include "facade/facade_types.h" #include "facade/op_status.h" +#include "util/fibers/fibers.h" +#include "util/fibers/synchronization.h" namespace dfly { @@ -239,7 +240,7 @@ template struct AggregateValue { } private: - Mutex mu_{}; + util::fb2::Mutex mu_{}; T current_{}; }; @@ -343,10 +344,10 @@ class Context : protected Cancellation { private: GenericError err_; - Mutex mu_; + util::fb2::Mutex mu_; ErrHandler err_handler_; - Fiber err_handler_fb_; + util::fb2::Fiber err_handler_fb_; }; struct ScanOpts { diff --git a/src/server/conn_context.h b/src/server/conn_context.h index c535dac1e..d3a86cf9a 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -8,7 +8,6 @@ #include #include "acl/acl_commands_def.h" -#include "core/fibers.h" #include "facade/conn_context.h" #include "facade/reply_capture.h" #include "server/common.h" diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 4d51316bf..943450b1e 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -675,7 +675,7 @@ void DebugCmd::Populate(CmdArgList args) { } ranges.emplace_back(from, options->total_count - from); - vector fb_arr(ranges.size()); + vector fb_arr(ranges.size()); for (size_t i = 0; i < ranges.size(); ++i) { auto range = ranges[i]; @@ -844,7 +844,7 @@ void DebugCmd::Inspect(string_view key, CmdArgList args) { } void DebugCmd::Watched() { - Mutex mu; + fb2::Mutex mu; vector watched_keys; vector awaked_trans; diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 977c6c6fb..8c0ce995f 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -133,7 +133,7 @@ struct SaveStagesController : public SaveStagesInputs { std::vector, std::filesystem::path>> snapshots_; absl::flat_hash_map rdb_name_map_; - Mutex rdb_name_map_mu_; + util::fb2::Mutex rdb_name_map_mu_; }; GenericError ValidateFilename(const std::filesystem::path& filename, bool new_version); diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 448edd1cc..e8070181a 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -15,7 +15,6 @@ #include "base/init.h" #include "base/io_buf.h" #include "base/zipf_gen.h" -#include "core/fibers.h" #include "facade/redis_parser.h" #include "util/fibers/dns_resolve.h" #include "util/fibers/pool.h" @@ -367,7 +366,7 @@ int main(int argc, char* argv[]) { absl::Duration duration = absl::Now() - start_time; LOG(INFO) << "Finished. Total time: " << duration; - dfly::Mutex mutex; + fb2::Mutex mutex; base::Histogram hist; LOG(INFO) << "Resetting all threads"; pp->AwaitFiberOnAll([&](auto* p) { diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 2a7af7c8c..31ffa5cc2 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -665,7 +665,7 @@ std::vector DflyCmd::GetReplicasRoleInfo() const { } void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const { - Mutex stats_mu; + util::fb2::Mutex stats_mu; lock_guard lk_main{mu_}; // prevent state changes auto cb = [this, &stats, &stats_mu](EngineShard* shard) { diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 1e190a9d7..1927df6ca 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -38,7 +38,7 @@ struct FlowInfo { facade::Connection* conn = nullptr; - Fiber full_sync_fb; // Full sync fiber. + util::fb2::Fiber full_sync_fb; // Full sync fiber. std::unique_ptr saver; // Saver for full sync phase. std::unique_ptr streamer; // Streamer for stable sync phase std::string eof_token; @@ -121,7 +121,7 @@ class DflyCmd { // Flows describe the state of shard-local flow. // They are always indexed by the shard index on the master. std::vector flows; - Mutex mu; // See top of header for locking levels. + util::fb2::Mutex mu; // See top of header for locking levels. }; public: @@ -223,7 +223,7 @@ class DflyCmd { using ReplicaInfoMap = absl::btree_map>; ReplicaInfoMap replica_infos_; - mutable Mutex mu_; // Guard global operations. See header top for locking levels. + mutable util::fb2::Mutex mu_; // Guard global operations. See header top for locking levels. }; } // namespace dfly diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index cbba7780f..ed8875d72 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -33,6 +33,7 @@ using namespace std; using namespace util; using absl::SetFlag; using absl::StrCat; +using fb2::Fiber; using ::io::Result; using testing::ElementsAre; using testing::HasSubstr; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index fc4f43abb..6aaa5ca7c 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -164,7 +164,7 @@ class RoundRobinSharder { static thread_local vector round_robin_shards_tl_cache_; static vector round_robin_shards_ ABSL_GUARDED_BY(mutex_); static ShardId next_shard_ ABSL_GUARDED_BY(mutex_); - static Mutex mutex_; + static fb2::Mutex mutex_; }; bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) { @@ -193,7 +193,7 @@ thread_local string RoundRobinSharder::round_robin_prefix_; thread_local vector RoundRobinSharder::round_robin_shards_tl_cache_; vector RoundRobinSharder::round_robin_shards_; ShardId RoundRobinSharder::next_shard_; -Mutex RoundRobinSharder::mutex_; +fb2::Mutex RoundRobinSharder::mutex_; } // namespace diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 5c1afbef4..e2a9e058d 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -243,7 +243,7 @@ class EngineShard { IntentLock shard_lock_; uint32_t defrag_task_ = 0; - Fiber fiber_periodic_; + util::fb2::Fiber fiber_periodic_; util::fb2::Done fiber_periodic_done_; DefragTaskState defrag_state_; diff --git a/src/server/io_utils.h b/src/server/io_utils.h index d40b3766b..137ac1427 100644 --- a/src/server/io_utils.h +++ b/src/server/io_utils.h @@ -3,7 +3,6 @@ // #include "base/io_buf.h" -#include "core/fibers.h" #include "io/io.h" #include "server/common.h" diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 3a441381f..9aea3c493 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -60,7 +60,7 @@ error_code Journal::Close() { VLOG(1) << "Journal::Close"; - Mutex ec_mu; + fb2::Mutex ec_mu; error_code res; lock_guard lk(state_mu_); diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 623ce49c2..2f41bea08 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -61,7 +61,7 @@ class Journal { std::optional slot, Entry::Payload payload, bool await); private: - mutable Mutex state_mu_; + mutable util::fb2::Mutex state_mu_; std::atomic_bool lameduck_{false}; }; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index c9f90c4d0..f0016ea20 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -46,7 +46,7 @@ class JournalStreamer : protected BufferedStreamerBase { uint32_t journal_cb_id_{0}; journal::Journal* journal_; - Fiber write_fb_{}; + util::fb2::Fiber write_fb_{}; }; // Serializes existing DB as RESTORE commands, and sends updates as regular commands. diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h index ffcbf18a1..8599f552a 100644 --- a/src/server/journal/tx_executor.h +++ b/src/server/journal/tx_executor.h @@ -31,7 +31,7 @@ class MultiShardExecution { void CancelAllBlockingEntities(); private: - Mutex map_mu; + util::fb2::Mutex map_mu; std::unordered_map tx_sync_execution; }; diff --git a/src/server/main_service.h b/src/server/main_service.h index 2daa8f827..554f461e8 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -185,7 +185,7 @@ class Service : public facade::ServiceInterface { const CommandId* exec_cid_; // command id of EXEC command for pipeline squashing - mutable Mutex mu_; + mutable util::fb2::Mutex mu_; GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_; }; diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index c52ed1e16..63c41063b 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -1,7 +1,12 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + #include "server/multi_command_squasher.h" #include +#include "base/logging.h" #include "facade/dragonfly_connection.h" #include "server/cluster/unique_slot_checker.h" #include "server/command_registry.h" diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index bcd64f678..be15699ef 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -4,8 +4,6 @@ #pragma once -#include "base/logging.h" -#include "core/fibers.h" #include "facade/reply_capture.h" #include "server/conn_context.h" #include "server/main_service.h" diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index f57e3482b..0b49c526a 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -124,7 +124,7 @@ class ProtocolClient { base::IoBuf resp_buf_; std::unique_ptr sock_; - Mutex sock_mu_; + util::fb2::Mutex sock_mu_; protected: Context cntx_; // context for tasks in replica. diff --git a/src/server/replica.h b/src/server/replica.h index 8349dc856..e19d8070c 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -146,8 +146,8 @@ class Replica : ProtocolClient { MasterContext master_context_; // In redis replication mode. - Fiber sync_fb_; - Fiber acks_fb_; + util::fb2::Fiber sync_fb_; + util::fb2::Fiber acks_fb_; util::fb2::EventCount waker_; std::vector> shard_flows_; @@ -157,7 +157,7 @@ class Replica : ProtocolClient { std::shared_ptr multi_shard_exe_; // Guard operations where flows might be in a mixed state (transition/setup) - Mutex flows_op_mu_; + util::fb2::Mutex flows_op_mu_; // repl_offs - till what offset we've already read from the master. // ack_offs_ last acknowledged offset. @@ -228,13 +228,13 @@ class DflyShardReplica : public ProtocolClient { // run out-of-order on the master instance. std::atomic_uint64_t journal_rec_executed_ = 0; - Fiber sync_fb_; + util::fb2::Fiber sync_fb_; - Fiber acks_fb_; + util::fb2::Fiber acks_fb_; size_t ack_offs_ = 0; bool force_ping_ = false; - Fiber execution_fb_; + util::fb2::Fiber execution_fb_; std::shared_ptr multi_shard_exe_; uint32_t flow_id_ = UINT32_MAX; // Flow id if replica acts as a dfly flow. diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index adfba166f..726112dd8 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -183,7 +183,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const { void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const { absl::flat_hash_map result; - Mutex mu; + fb2::Mutex mu; shard_set->pool()->AwaitFiberOnAll([&](auto* pb) { auto* ss = ServerState::tlocal(); diff --git a/src/server/script_mgr.h b/src/server/script_mgr.h index a76901f3c..c80296725 100644 --- a/src/server/script_mgr.h +++ b/src/server/script_mgr.h @@ -78,7 +78,7 @@ class ScriptMgr { ScriptParams default_params_; absl::flat_hash_map db_; - mutable Mutex mu_; + mutable util::fb2::Mutex mu_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 544c9625f..b3e337f5e 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -839,7 +839,7 @@ fb2::Future ServerFamily::Load(const std::string& load_path) { auto& pool = service_.proactor_pool(); - vector load_fibers; + vector load_fibers; load_fibers.reserve(paths.size()); auto aggregated_result = std::make_shared(); @@ -1755,7 +1755,7 @@ void ServerFamily::ResetStat() { Metrics ServerFamily::GetMetrics() const { Metrics result; - Mutex mu; + util::fb2::Mutex mu; auto cmd_stat_cb = [&dest = result.cmd_stats_map](string_view name, const CmdCallStats& stat) { auto& [calls, sum] = dest[absl::AsciiStrToLower(name)]; diff --git a/src/server/server_family.h b/src/server/server_family.h index bc32fe6f0..f589924ad 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -17,6 +17,7 @@ #include "server/replica.h" #include "server/server_state.h" #include "util/fibers/fiberqueue_threadpool.h" +#include "util/fibers/future.h" void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service, std::string_view sub_cmd); @@ -278,7 +279,7 @@ class ServerFamily { GenericError WaitUntilSaveFinished(Transaction* trans, bool ignore_state = false); - Fiber snapshot_schedule_fb_; + util::fb2::Fiber snapshot_schedule_fb_; util::fb2::Future load_result_; uint32_t stats_caching_task_ = 0; @@ -288,7 +289,7 @@ class ServerFamily { std::vector listeners_; util::ProactorBase* pb_task_ = nullptr; - mutable Mutex replicaof_mu_, save_mu_; + mutable util::fb2::Mutex replicaof_mu_, save_mu_; std::shared_ptr replica_ ABSL_GUARDED_BY(replicaof_mu_); std::unique_ptr script_mgr_; diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 24d15f52d..3b72e4e8c 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -144,7 +144,7 @@ class SliceSnapshot { // Used for sanity checks. bool serialize_bucket_running_ = false; - Fiber snapshot_fb_; // IterateEntriesFb + util::fb2::Fiber snapshot_fb_; // IterateEntriesFb CompressionMode compression_mode_; RdbTypeFreqMap type_freq_map_; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 4b21c47f6..2d553157a 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -637,7 +637,7 @@ vector BaseFamilyTest::StrArray(const RespExpr& expr) { } absl::flat_hash_set BaseFamilyTest::GetLastUsedKeys() { - Mutex mu; + fb2::Mutex mu; absl::flat_hash_set result; auto add_keys = [&](ProactorBase* proactor) { @@ -671,14 +671,15 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function& c << "Timeout of " << timeout << " reached when expecting condition"; } -Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function& condition) { +fb2::Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function& condition) { TransactionSuspension tx; pp_->at(0)->Await([&] { tx.Start(); }); - auto fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [condition, tx = std::move(tx)]() mutable { - ExpectConditionWithinTimeout(condition); - tx.Terminate(); - }); + auto fb = + pp_->at(0)->LaunchFiber(fb2::Launch::dispatch, [condition, tx = std::move(tx)]() mutable { + ExpectConditionWithinTimeout(condition); + tx.Terminate(); + }); return fb; } diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 702096d5f..2e7a08aa2 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -17,7 +17,8 @@ namespace dfly { using namespace facade; -using namespace std; +using util::fb2::Fiber; +using util::fb2::Launch; class TestConnection : public facade::Connection { public: @@ -143,7 +144,7 @@ class BaseFamilyTest : public ::testing::Test { static absl::flat_hash_set GetLastUsedKeys(); static void ExpectConditionWithinTimeout(const std::function& condition, absl::Duration timeout = absl::Seconds(10)); - Fiber ExpectConditionWithSuspension(const std::function& condition); + util::fb2::Fiber ExpectConditionWithSuspension(const std::function& condition); static unsigned NumLocked(); @@ -156,7 +157,7 @@ class BaseFamilyTest : public ::testing::Test { unsigned num_threads_ = 3; absl::flat_hash_map> connections_; - Mutex mu_; + util::fb2::Mutex mu_; ConnectionContext::DebugInfo last_cmd_dbg_info_; std::vector resp_vec_; diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index ba8f19be6..ea862ef3e 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -8,7 +8,6 @@ #include #include "core/external_alloc.h" -#include "core/fibers.h" #include "server/common.h" #include "server/io_mgr.h" #include "server/table.h"