diff --git a/CMakeLists.txt b/CMakeLists.txt index 24a889133..bfb45acce 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,6 +20,7 @@ endif() if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") set(CMAKE_CXX_FLAGS "-Wthread-safety ${CMAKE_CXX_FLAGS}") + add_compile_options(-Werror=thread-safety) endif() # We can not use here CHECK_CXX_COMPILER_FLAG because systems that do not support sanitizers diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index f65e72405..df2c7efa9 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -473,7 +473,7 @@ class RedisReplyBuilder2 : public RedisReplyBuilder2Base { void SendScoredArray(absl::Span> arr, bool with_scores) override; - void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) { + void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) override { SendSimpleStrArr2(arr); } void SendStringArr(RedisReplyBuilder::StrSpan arr, CollectionType type = ARRAY) override { diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 77a01e01c..abeb46042 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -24,6 +24,7 @@ #include "server/namespaces.h" #include "server/server_family.h" #include "server/server_state.h" +#include "util/fibers/synchronization.h" ABSL_FLAG(std::string, cluster_announce_ip, "", "DEPRECATED: use --announce_ip"); @@ -93,8 +94,8 @@ ClusterConfig* ClusterFamily::cluster_config() { } void ClusterFamily::Shutdown() { - shard_set->pool()->at(0)->Await([this] { - lock_guard lk(set_config_mu); + shard_set->pool()->at(0)->Await([this]() ABSL_LOCKS_EXCLUDED(set_config_mu) { + util::fb2::LockGuard lk(set_config_mu); if (!tl_cluster_config) return; @@ -102,6 +103,7 @@ void ClusterFamily::Shutdown() { RemoveOutgoingMigrations(empty_config, tl_cluster_config); RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config)); + util::fb2::LockGuard migration_lk(migration_mu_); DCHECK(outgoing_migration_jobs_.empty()); DCHECK(incoming_migrations_jobs_.empty()); }); @@ -540,7 +542,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) return cntx->SendError("Invalid cluster configuration."); } - lock_guard gu(set_config_mu); + util::fb2::LockGuard gu(set_config_mu); VLOG(1) << "Setting new cluster config: " << json_str; auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config); @@ -549,7 +551,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) SlotRanges enable_slots, disable_slots; { - std::lock_guard lk(migration_mu_); + util::fb2::LockGuard lk(migration_mu_); // If migration state is changed simultaneously, the changes to config will be applied after // set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem for (const auto& m : incoming_migrations_jobs_) { @@ -623,12 +625,12 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c fb2::Mutex mu; - auto cb = [&](auto*) { + auto cb = [&](auto*) ABSL_LOCKS_EXCLUDED(mu) { EngineShard* shard = EngineShard::tlocal(); if (shard == nullptr) return; - lock_guard lk(mu); + util::fb2::LockGuard lk(mu); for (auto& [slot, data] : slots_stats) { data += namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot); } @@ -754,7 +756,7 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { std::shared_ptr ClusterFamily::GetIncomingMigration( std::string_view source_id) { - lock_guard lk(migration_mu_); + util::fb2::LockGuard lk(migration_mu_); for (const auto& mj : incoming_migrations_jobs_) { if (mj->GetSourceID() == source_id) { return mj; @@ -766,7 +768,7 @@ std::shared_ptr ClusterFamily::GetIncomingMigration( SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr new_config, shared_ptr old_config) { auto migrations = new_config->GetFinishedOutgoingMigrations(old_config); - lock_guard lk(migration_mu_); + util::fb2::LockGuard lk(migration_mu_); SlotRanges removed_slots; for (const auto& m : migrations) { auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(), @@ -830,7 +832,7 @@ bool RemoveIncomingMigrationImpl(std::vector& migrations) { - lock_guard lk(migration_mu_); + util::fb2::LockGuard lk(migration_mu_); for (const auto& m : migrations) { RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_info.id); VLOG(1) << "Migration was canceled from: " << m.node_info.id; @@ -865,7 +867,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Init migration " << source_id; - lock_guard lk(migration_mu_); + util::fb2::LockGuard lk(migration_mu_); auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, source_id); LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id; @@ -876,7 +878,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { } std::shared_ptr ClusterFamily::CreateOutgoingMigration(MigrationInfo info) { - std::lock_guard lk(migration_mu_); + util::fb2::LockGuard lk(migration_mu_); auto migration = make_shared(std::move(info), this, server_family_); outgoing_migration_jobs_.emplace_back(migration); return migration; @@ -915,8 +917,8 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots, bool is_incoming) { VLOG(1) << "Update config for slots ranges: " << slots.ToString() << " for " << MyID() << " : " << node_id; - lock_guard gu(set_config_mu); - lock_guard lk(migration_mu_); + util::fb2::LockGuard gu(set_config_mu); + util::fb2::LockGuard lk(migration_mu_); bool is_migration_valid = false; if (is_incoming) { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 5fea5d192..a69990594 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -28,7 +28,7 @@ class ClusterFamily { void Register(CommandRegistry* registry); - void Shutdown(); + void Shutdown() ABSL_LOCKS_EXCLUDED(set_config_mu); // Returns a thread-local pointer. static ClusterConfig* cluster_config(); @@ -57,8 +57,10 @@ class ClusterFamily { // Custom Dragonfly commands for cluster management void DflyCluster(CmdArgList args, ConnectionContext* cntx); - void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx); - void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); + void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) + ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_); + void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx) + ABSL_LOCKS_EXCLUDED(migration_mu_); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); private: // Slots migration section @@ -69,7 +71,7 @@ class ClusterFamily { void DflyMigrate(CmdArgList args, ConnectionContext* cntx); // DFLYMIGRATE INIT is internal command to create incoming migration object - void InitMigration(CmdArgList args, ConnectionContext* cntx); + void InitMigration(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(migration_mu_); // DFLYMIGRATE FLOW initiate second step in slots migration procedure // this request should be done for every shard on the target node @@ -79,15 +81,19 @@ class ClusterFamily { void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx); - std::shared_ptr GetIncomingMigration(std::string_view source_id); + std::shared_ptr GetIncomingMigration(std::string_view source_id) + ABSL_LOCKS_EXCLUDED(migration_mu_); void StartSlotMigrations(std::vector migrations); SlotRanges RemoveOutgoingMigrations(std::shared_ptr new_config, - std::shared_ptr old_config); - void RemoveIncomingMigrations(const std::vector& migrations); + std::shared_ptr old_config) + ABSL_LOCKS_EXCLUDED(migration_mu_); + void RemoveIncomingMigrations(const std::vector& migrations) + ABSL_LOCKS_EXCLUDED(migration_mu_); // store info about migration and create unique session id - std::shared_ptr CreateOutgoingMigration(MigrationInfo info); + std::shared_ptr CreateOutgoingMigration(MigrationInfo info) + ABSL_LOCKS_EXCLUDED(migration_mu_); mutable util::fb2::Mutex migration_mu_; // guard migrations operations // holds all incoming slots migrations that are currently in progress. diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index ebaf5686f..83c51226b 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -14,6 +14,7 @@ #include "server/journal/executor.h" #include "server/journal/tx_executor.h" #include "server/main_service.h" +#include "util/fibers/synchronization.h" ABSL_DECLARE_FLAG(int, slot_migration_connection_timeout_ms); @@ -34,14 +35,15 @@ class ClusterShardMigration { in_migration_(in_migration) { } - void Start(Context* cntx, util::FiberSocketBase* source, util::fb2::BlockingCounter bc) { + void Start(Context* cntx, util::FiberSocketBase* source, util::fb2::BlockingCounter bc) + ABSL_LOCKS_EXCLUDED(mu_) { { - std::lock_guard lk(mu_); + util::fb2::LockGuard lk(mu_); socket_ = source; } - absl::Cleanup cleanup([this]() { - std::lock_guard lk(mu_); + absl::Cleanup cleanup([this]() ABSL_LOCKS_EXCLUDED(mu_) { + util::fb2::LockGuard lk(mu_); socket_ = nullptr; }); JournalReader reader{source, 0}; @@ -79,7 +81,7 @@ class ClusterShardMigration { } std::error_code Cancel() { - std::lock_guard lk(mu_); + util::fb2::LockGuard lk(mu_); if (socket_ != nullptr) { return socket_->proactor()->Await([s = socket_]() { if (s->IsOpen()) { @@ -114,7 +116,6 @@ class ClusterShardMigration { } } - private: uint32_t source_shard_id_; util::fb2::Mutex mu_; util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 70b2e6553..8bdaa970c 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -364,6 +364,7 @@ bool OutgoingMigration::CheckFlowsForErrors() { } size_t OutgoingMigration::GetKeyCount() const { + util::fb2::LockGuard lk(state_mu_); if (state_ == MigrationState::C_FINISHED) { return keys_number_; } diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 9e60a536c..bd1487d76 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -56,7 +56,7 @@ class OutgoingMigration : private ProtocolClient { return last_error_.Format(); } - size_t GetKeyCount() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(state_mu_); + size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_); static constexpr long kInvalidAttempt = -1; static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; diff --git a/src/server/common.h b/src/server/common.h index d680cd202..a509b57b9 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -377,6 +377,30 @@ class ABSL_LOCKABLE ThreadLocalMutex { util::fb2::detail::FiberInterface* locked_fiber_{nullptr}; }; +// Replacement of std::SharedLock that allows -Wthread-safety +template class ABSL_SCOPED_LOCKABLE SharedLock { + public: + explicit SharedLock(Mutex& m) ABSL_EXCLUSIVE_LOCK_FUNCTION(m) : m_(m) { + m_.lock_shared(); + is_locked_ = true; + } + + ~SharedLock() ABSL_UNLOCK_FUNCTION() { + if (is_locked_) { + m_.unlock_shared(); + } + } + + void unlock() ABSL_UNLOCK_FUNCTION() { + m_.unlock_shared(); + is_locked_ = false; + } + + private: + Mutex& m_; + bool is_locked_; +}; + extern size_t serialization_max_chunk_size; } // namespace dfly diff --git a/src/server/db_slice.h b/src/server/db_slice.h index c11259820..546faac2b 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -565,8 +565,7 @@ class DbSlice { return version_++; } - void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const - ABSL_EXCLUSIVE_LOCKS_REQUIRED(local_mu_); + void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const; // Used to provide exclusive access while Traversing segments mutable ThreadLocalMutex local_mu_; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 1e6ab3c9a..0e65814e5 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -102,7 +102,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r } // namespace void DflyCmd::ReplicaInfo::Cancel() { - lock_guard lk = GetExclusiveLock(); + auto lk = GetExclusiveLock(); if (replica_state == SyncState::CANCELLED) { return; } @@ -261,7 +261,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { string eof_token; { - lock_guard lk = replica_ptr->GetExclusiveLock(); + auto lk = replica_ptr->GetExclusiveLock(); if (replica_ptr->replica_state != SyncState::PREPARATION) return cntx->SendError(kInvalidState); @@ -325,7 +325,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { if (!sync_id) return; - lock_guard lk = replica_ptr->GetExclusiveLock(); + auto lk = replica_ptr->GetExclusiveLock(); if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::PREPARATION, rb)) return; @@ -364,7 +364,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { if (!sync_id) return; - lock_guard lk = replica_ptr->GetExclusiveLock(); + auto lk = replica_ptr->GetExclusiveLock(); if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::FULL_SYNC, rb)) return; @@ -418,7 +418,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { return; { - shared_lock lk = replica_ptr->GetSharedLock(); + auto lk = replica_ptr->GetSharedLock(); if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::STABLE_SYNC, rb)) return; @@ -467,7 +467,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { atomic_bool catchup_success = true; if (*status == OpStatus::OK) { - shared_lock lk = replica_ptr->GetSharedLock(); + auto lk = replica_ptr->GetSharedLock(); auto cb = [replica_ptr = std::move(replica_ptr), end_time, &catchup_success](EngineShard* shard) { if (!WaitReplicaFlowToCatchup(end_time, replica_ptr.get(), shard)) { @@ -710,6 +710,7 @@ void DflyCmd::StopReplication(uint32_t sync_id) { replica_infos_.erase(sync_id); } +// Because we need to annotate unique_lock void DflyCmd::BreakStalledFlowsInShard() { std::unique_lock global_lock(mu_, try_to_lock); @@ -722,7 +723,7 @@ void DflyCmd::BreakStalledFlowsInShard() { vector deleted; for (auto [sync_id, replica_ptr] : replica_infos_) { - shared_lock replica_lock = replica_ptr->GetSharedLock(); + auto replica_lock = replica_ptr->GetSharedLock(); if (!replica_ptr->flows[sid].saver) continue; @@ -787,9 +788,9 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const { { util::fb2::LockGuard lk{mu_}; // prevent state changes - auto cb = [&](EngineShard* shard) { + auto cb = [&](EngineShard* shard) ABSL_NO_THREAD_SAFETY_ANALYSIS { for (const auto& [_, info] : replica_infos_) { - shared_lock repl_lk = info->GetSharedLock(); + auto repl_lk = info->GetSharedLock(); // flows should not be empty. DCHECK(!info->flows.empty()); @@ -836,7 +837,7 @@ std::map DflyCmd::ReplicationLagsLocked() const { std::vector> shard_lags(shard_set->size()); shard_set->RunBriefInParallel([&shard_lags, this](EngineShard* shard) { auto& lags = shard_lags[shard->shard_id()]; - for (const auto& info : replica_infos_) { + for (const auto& info : ABSL_TS_UNCHECKED_READ(replica_infos_)) { const ReplicaInfo* replica = info.second.get(); if (shard->journal()) { int64_t lag = shard->journal()->GetLsn() - replica->flows[shard->shard_id()].last_acked_lsn; diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 14b6561bc..e9ad3b0e3 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -10,6 +10,7 @@ #include #include "server/conn_context.h" +#include "util/fibers/synchronization.h" namespace facade { class RedisReplyBuilder; @@ -101,7 +102,7 @@ class DflyCmd { enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED }; // Stores information related to a single replica. - struct ReplicaInfo { + struct ABSL_LOCKABLE ReplicaInfo { ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port, Context::ErrHandler err_handler) : replica_state{SyncState::PREPARATION}, @@ -111,12 +112,12 @@ class DflyCmd { flows{flow_count} { } - [[nodiscard]] auto GetExclusiveLock() { - return std::lock_guard{shared_mu}; + [[nodiscard]] auto GetExclusiveLock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { + return util::fb2::LockGuard{shared_mu}; } - [[nodiscard]] auto GetSharedLock() { - return std::shared_lock{shared_mu}; + [[nodiscard]] auto GetSharedLock() ABSL_EXCLUSIVE_LOCK_FUNCTION() { + return dfly::SharedLock{shared_mu}; } // Transition into cancelled state, run cleanup. @@ -156,13 +157,13 @@ class DflyCmd { // Master-side command. Provides Replica info. std::vector GetReplicasRoleInfo() const ABSL_LOCKS_EXCLUDED(mu_); - void GetReplicationMemoryStats(ReplicationMemoryStats* out) const ABSL_LOCKS_EXCLUDED(mu_); + void GetReplicationMemoryStats(ReplicationMemoryStats* out) const ABSL_NO_THREAD_SAFETY_ANALYSIS; // Sets metadata. void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version); // Tries to break those flows that stuck on socket write for too long time. - void BreakStalledFlowsInShard(); + void BreakStalledFlowsInShard() ABSL_NO_THREAD_SAFETY_ANALYSIS; private: // JOURNAL [START/STOP] diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 0e7927369..3fbd9969f 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -109,17 +109,17 @@ class RoundRobinSharder { std::fill(round_robin_shards_tl_cache_.begin(), round_robin_shards_tl_cache_.end(), kInvalidSid); - std::lock_guard guard(mutex_); + util::fb2::LockGuard guard(mutex_); if (round_robin_shards_.empty()) { round_robin_shards_ = round_robin_shards_tl_cache_; } } } - static void Destroy() { + static void Destroy() ABSL_LOCKS_EXCLUDED(mutex_) { round_robin_shards_tl_cache_.clear(); - std::lock_guard guard(mutex_); + util::fb2::LockGuard guard(mutex_); round_robin_shards_.clear(); } @@ -138,7 +138,7 @@ class RoundRobinSharder { ShardId sid = round_robin_shards_tl_cache_[index]; if (sid == kInvalidSid) { - std::lock_guard guard(mutex_); + util::fb2::LockGuard guard(mutex_); sid = round_robin_shards_[index]; if (sid == kInvalidSid) { sid = next_shard_; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 108ceeac7..ef60679e0 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2496,22 +2496,30 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) { } void Service::RequestLoadingState() { - std::unique_lock lk(mu_); - ++loading_state_counter_; - if (global_state_ != GlobalState::LOADING) { - DCHECK_EQ(global_state_, GlobalState::ACTIVE); - lk.unlock(); + bool switch_state = false; + { + util::fb2::LockGuard lk(mu_); + ++loading_state_counter_; + if (global_state_ != GlobalState::LOADING) { + DCHECK_EQ(global_state_, GlobalState::ACTIVE); + switch_state = true; + } + } + if (switch_state) { SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); } } void Service::RemoveLoadingState() { - unique_lock lk(mu_); - DCHECK_EQ(global_state_, GlobalState::LOADING); - DCHECK_GT(loading_state_counter_, 0u); - --loading_state_counter_; - if (loading_state_counter_ == 0) { - lk.unlock(); + bool switch_state = false; + { + util::fb2::LockGuard lk(mu_); + DCHECK_EQ(global_state_, GlobalState::LOADING); + DCHECK_GT(loading_state_counter_, 0u); + --loading_state_counter_; + switch_state = loading_state_counter_ == 0; + } + if (switch_state) { SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); } } diff --git a/src/server/main_service.h b/src/server/main_service.h index 3552828f9..9c829199b 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -79,8 +79,8 @@ class Service : public facade::ServiceInterface { // Upon switch, updates cached global state in threadlocal ServerState struct. GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_); - void RequestLoadingState(); - void RemoveLoadingState(); + void RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_); + void RemoveLoadingState() ABSL_LOCKS_EXCLUDED(mu_); GlobalState GetGlobalState() const ABSL_LOCKS_EXCLUDED(mu_); diff --git a/src/server/namespaces.cc b/src/server/namespaces.cc index 2c05772cf..a6fd2de54 100644 --- a/src/server/namespaces.cc +++ b/src/server/namespaces.cc @@ -2,6 +2,7 @@ #include "base/flags.h" #include "base/logging.h" +#include "server/common.h" #include "server/engine_shard_set.h" ABSL_DECLARE_FLAG(bool, cache_mode); @@ -62,7 +63,7 @@ bool Namespaces::IsInitialized() const { void Namespaces::Clear() { util::fb2::LockGuard guard(mu_); - namespaces.default_namespace_ = nullptr; + default_namespace_ = nullptr; if (namespaces_.empty()) { return; @@ -70,12 +71,12 @@ void Namespaces::Clear() { shard_set->RunBriefInParallel([&](EngineShard* es) { CHECK(es != nullptr); - for (auto& ns : namespaces_) { + for (auto& ns : ABSL_TS_UNCHECKED_READ(namespaces_)) { ns.second.shard_db_slices_[es->shard_id()].reset(); } }); - namespaces.namespaces_.clear(); + namespaces_.clear(); } Namespace& Namespaces::GetDefaultNamespace() const { @@ -86,7 +87,7 @@ Namespace& Namespaces::GetDefaultNamespace() const { Namespace& Namespaces::GetOrInsert(std::string_view ns) { { // Try to look up under a shared lock - std::shared_lock guard(mu_); + dfly::SharedLock guard(mu_); auto it = namespaces_.find(ns); if (it != namespaces_.end()) { return it->second; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 59e7f4c66..1dc5d2550 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -871,7 +871,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorpool()->GetNextProactor()->Await([this] { + shard_set->pool()->GetNextProactor()->Await([this]() ABSL_LOCKS_EXCLUDED(loading_stats_mu_) { GenericError ec = DoSave(); - std::lock_guard lk{loading_stats_mu_}; + util::fb2::LockGuard lk{loading_stats_mu_}; loading_stats_.backup_count++; if (ec) { @@ -1075,7 +1075,7 @@ void ServerFamily::SnapshotScheduling() { GenericError ec = DoSave(); - std::lock_guard lk{loading_stats_mu_}; + util::fb2::LockGuard lk{loading_stats_mu_}; loading_stats_.backup_count++; if (ec) { @@ -1559,7 +1559,7 @@ GenericError ServerFamily::WaitUntilSaveFinished(Transaction* trans, bool ignore detail::SaveInfo save_info; { - std::lock_guard lk(save_mu_); + util::fb2::LockGuard lk(save_mu_); save_info = save_controller_->Finalize(); if (save_info.error) { @@ -2062,13 +2062,13 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { } { - std::lock_guard lk{loading_stats_mu_}; + util::fb2::LockGuard lk{loading_stats_mu_}; result.loading_stats = loading_stats_; } // Update peak stats. We rely on the fact that GetMetrics is called frequently enough to // update peak_stats_ from it. - lock_guard lk{peak_stats_mu_}; + util::fb2::LockGuard lk{peak_stats_mu_}; UpdateMax(&peak_stats_.conn_dispatch_queue_bytes, result.facade_stats.conn_stats.dispatch_queue_bytes); UpdateMax(&peak_stats_.conn_read_buf_capacity, result.facade_stats.conn_stats.read_buf_capacity); diff --git a/src/server/server_family.h b/src/server/server_family.h index 8e78f76c1..f2c6d6a3b 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -260,7 +260,7 @@ class ServerFamily { private: void JoinSnapshotSchedule(); - void LoadFromSnapshot(); + void LoadFromSnapshot() ABSL_LOCKS_EXCLUDED(loading_stats_mu_); uint32_t shard_count() const { return shard_set->size(); @@ -305,7 +305,7 @@ class ServerFamily { // Returns the number of loaded keys if successful. io::Result LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys); - void SnapshotScheduling(); + void SnapshotScheduling() ABSL_LOCKS_EXCLUDED(loading_stats_mu_); void SendInvalidationMessages() const; @@ -319,8 +319,8 @@ class ServerFamily { GenericError DoSaveCheckAndStart(bool new_version, string_view basename, Transaction* trans, bool ignore_state = false) ABSL_LOCKS_EXCLUDED(save_mu_); - GenericError WaitUntilSaveFinished(Transaction* trans, bool ignore_state = false) - ABSL_LOCKS_EXCLUDED(save_mu_); + GenericError WaitUntilSaveFinished(Transaction* trans, + bool ignore_state = false) ABSL_NO_THREAD_SAFETY_ANALYSIS; void StopAllClusterReplicas() ABSL_EXCLUSIVE_LOCKS_REQUIRED(replicaof_mu_); bool DoAuth(ConnectionContext* cntx, std::string_view username, std::string_view password) const;