1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: enable -Werror=thread-safety and add missing annotations (part 2/2) (#3595)

* add missing annotations
* small mutex fixes
* enable -Werror=thread-safety for clang builds

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-08-30 15:42:30 +03:00 committed by GitHub
parent 0705bbb536
commit 41f7b611d0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 127 additions and 82 deletions

View file

@ -20,6 +20,7 @@ endif()
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang") if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(CMAKE_CXX_FLAGS "-Wthread-safety ${CMAKE_CXX_FLAGS}") set(CMAKE_CXX_FLAGS "-Wthread-safety ${CMAKE_CXX_FLAGS}")
add_compile_options(-Werror=thread-safety)
endif() endif()
# We can not use here CHECK_CXX_COMPILER_FLAG because systems that do not support sanitizers # We can not use here CHECK_CXX_COMPILER_FLAG because systems that do not support sanitizers

View file

@ -473,7 +473,7 @@ class RedisReplyBuilder2 : public RedisReplyBuilder2Base {
void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr, void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) override; bool with_scores) override;
void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) { void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) override {
SendSimpleStrArr2(arr); SendSimpleStrArr2(arr);
} }
void SendStringArr(RedisReplyBuilder::StrSpan arr, CollectionType type = ARRAY) override { void SendStringArr(RedisReplyBuilder::StrSpan arr, CollectionType type = ARRAY) override {

View file

@ -24,6 +24,7 @@
#include "server/namespaces.h" #include "server/namespaces.h"
#include "server/server_family.h" #include "server/server_family.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "util/fibers/synchronization.h"
ABSL_FLAG(std::string, cluster_announce_ip, "", "DEPRECATED: use --announce_ip"); ABSL_FLAG(std::string, cluster_announce_ip, "", "DEPRECATED: use --announce_ip");
@ -93,8 +94,8 @@ ClusterConfig* ClusterFamily::cluster_config() {
} }
void ClusterFamily::Shutdown() { void ClusterFamily::Shutdown() {
shard_set->pool()->at(0)->Await([this] { shard_set->pool()->at(0)->Await([this]() ABSL_LOCKS_EXCLUDED(set_config_mu) {
lock_guard lk(set_config_mu); util::fb2::LockGuard lk(set_config_mu);
if (!tl_cluster_config) if (!tl_cluster_config)
return; return;
@ -102,6 +103,7 @@ void ClusterFamily::Shutdown() {
RemoveOutgoingMigrations(empty_config, tl_cluster_config); RemoveOutgoingMigrations(empty_config, tl_cluster_config);
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config)); RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));
util::fb2::LockGuard migration_lk(migration_mu_);
DCHECK(outgoing_migration_jobs_.empty()); DCHECK(outgoing_migration_jobs_.empty());
DCHECK(incoming_migrations_jobs_.empty()); DCHECK(incoming_migrations_jobs_.empty());
}); });
@ -540,7 +542,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
return cntx->SendError("Invalid cluster configuration."); return cntx->SendError("Invalid cluster configuration.");
} }
lock_guard gu(set_config_mu); util::fb2::LockGuard gu(set_config_mu);
VLOG(1) << "Setting new cluster config: " << json_str; VLOG(1) << "Setting new cluster config: " << json_str;
auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config); auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config);
@ -549,7 +551,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotRanges enable_slots, disable_slots; SlotRanges enable_slots, disable_slots;
{ {
std::lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
// If migration state is changed simultaneously, the changes to config will be applied after // If migration state is changed simultaneously, the changes to config will be applied after
// set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem // set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem
for (const auto& m : incoming_migrations_jobs_) { for (const auto& m : incoming_migrations_jobs_) {
@ -623,12 +625,12 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
fb2::Mutex mu; fb2::Mutex mu;
auto cb = [&](auto*) { auto cb = [&](auto*) ABSL_LOCKS_EXCLUDED(mu) {
EngineShard* shard = EngineShard::tlocal(); EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr) if (shard == nullptr)
return; return;
lock_guard lk(mu); util::fb2::LockGuard lk(mu);
for (auto& [slot, data] : slots_stats) { for (auto& [slot, data] : slots_stats) {
data += namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot); data += namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot);
} }
@ -754,7 +756,7 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration( std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration(
std::string_view source_id) { std::string_view source_id) {
lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) { for (const auto& mj : incoming_migrations_jobs_) {
if (mj->GetSourceID() == source_id) { if (mj->GetSourceID() == source_id) {
return mj; return mj;
@ -766,7 +768,7 @@ std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration(
SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr<ClusterConfig> new_config, SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
shared_ptr<ClusterConfig> old_config) { shared_ptr<ClusterConfig> old_config) {
auto migrations = new_config->GetFinishedOutgoingMigrations(old_config); auto migrations = new_config->GetFinishedOutgoingMigrations(old_config);
lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
SlotRanges removed_slots; SlotRanges removed_slots;
for (const auto& m : migrations) { for (const auto& m : migrations) {
auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(), auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(),
@ -830,7 +832,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
} // namespace } // namespace
void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations) { void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations) {
lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
for (const auto& m : migrations) { for (const auto& m : migrations) {
RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_info.id); RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_info.id);
VLOG(1) << "Migration was canceled from: " << m.node_info.id; VLOG(1) << "Migration was canceled from: " << m.node_info.id;
@ -865,7 +867,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Init migration " << source_id; VLOG(1) << "Init migration " << source_id;
lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, source_id); auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, source_id);
LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id; LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id;
@ -876,7 +878,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
} }
std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(MigrationInfo info) { std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(MigrationInfo info) {
std::lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
auto migration = make_shared<OutgoingMigration>(std::move(info), this, server_family_); auto migration = make_shared<OutgoingMigration>(std::move(info), this, server_family_);
outgoing_migration_jobs_.emplace_back(migration); outgoing_migration_jobs_.emplace_back(migration);
return migration; return migration;
@ -915,8 +917,8 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
const SlotRanges& slots, bool is_incoming) { const SlotRanges& slots, bool is_incoming) {
VLOG(1) << "Update config for slots ranges: " << slots.ToString() << " for " << MyID() << " : " VLOG(1) << "Update config for slots ranges: " << slots.ToString() << " for " << MyID() << " : "
<< node_id; << node_id;
lock_guard gu(set_config_mu); util::fb2::LockGuard gu(set_config_mu);
lock_guard lk(migration_mu_); util::fb2::LockGuard lk(migration_mu_);
bool is_migration_valid = false; bool is_migration_valid = false;
if (is_incoming) { if (is_incoming) {

View file

@ -28,7 +28,7 @@ class ClusterFamily {
void Register(CommandRegistry* registry); void Register(CommandRegistry* registry);
void Shutdown(); void Shutdown() ABSL_LOCKS_EXCLUDED(set_config_mu);
// Returns a thread-local pointer. // Returns a thread-local pointer.
static ClusterConfig* cluster_config(); static ClusterConfig* cluster_config();
@ -57,8 +57,10 @@ class ClusterFamily {
// Custom Dragonfly commands for cluster management // Custom Dragonfly commands for cluster management
void DflyCluster(CmdArgList args, ConnectionContext* cntx); void DflyCluster(CmdArgList args, ConnectionContext* cntx);
void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx); void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_);
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx)
ABSL_LOCKS_EXCLUDED(migration_mu_);
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
private: // Slots migration section private: // Slots migration section
@ -69,7 +71,7 @@ class ClusterFamily {
void DflyMigrate(CmdArgList args, ConnectionContext* cntx); void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
// DFLYMIGRATE INIT is internal command to create incoming migration object // DFLYMIGRATE INIT is internal command to create incoming migration object
void InitMigration(CmdArgList args, ConnectionContext* cntx); void InitMigration(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(migration_mu_);
// DFLYMIGRATE FLOW initiate second step in slots migration procedure // DFLYMIGRATE FLOW initiate second step in slots migration procedure
// this request should be done for every shard on the target node // this request should be done for every shard on the target node
@ -79,15 +81,19 @@ class ClusterFamily {
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx); void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);
std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id); std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id)
ABSL_LOCKS_EXCLUDED(migration_mu_);
void StartSlotMigrations(std::vector<MigrationInfo> migrations); void StartSlotMigrations(std::vector<MigrationInfo> migrations);
SlotRanges RemoveOutgoingMigrations(std::shared_ptr<ClusterConfig> new_config, SlotRanges RemoveOutgoingMigrations(std::shared_ptr<ClusterConfig> new_config,
std::shared_ptr<ClusterConfig> old_config); std::shared_ptr<ClusterConfig> old_config)
void RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations); ABSL_LOCKS_EXCLUDED(migration_mu_);
void RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations)
ABSL_LOCKS_EXCLUDED(migration_mu_);
// store info about migration and create unique session id // store info about migration and create unique session id
std::shared_ptr<OutgoingMigration> CreateOutgoingMigration(MigrationInfo info); std::shared_ptr<OutgoingMigration> CreateOutgoingMigration(MigrationInfo info)
ABSL_LOCKS_EXCLUDED(migration_mu_);
mutable util::fb2::Mutex migration_mu_; // guard migrations operations mutable util::fb2::Mutex migration_mu_; // guard migrations operations
// holds all incoming slots migrations that are currently in progress. // holds all incoming slots migrations that are currently in progress.

View file

@ -14,6 +14,7 @@
#include "server/journal/executor.h" #include "server/journal/executor.h"
#include "server/journal/tx_executor.h" #include "server/journal/tx_executor.h"
#include "server/main_service.h" #include "server/main_service.h"
#include "util/fibers/synchronization.h"
ABSL_DECLARE_FLAG(int, slot_migration_connection_timeout_ms); ABSL_DECLARE_FLAG(int, slot_migration_connection_timeout_ms);
@ -34,14 +35,15 @@ class ClusterShardMigration {
in_migration_(in_migration) { in_migration_(in_migration) {
} }
void Start(Context* cntx, util::FiberSocketBase* source, util::fb2::BlockingCounter bc) { void Start(Context* cntx, util::FiberSocketBase* source, util::fb2::BlockingCounter bc)
ABSL_LOCKS_EXCLUDED(mu_) {
{ {
std::lock_guard lk(mu_); util::fb2::LockGuard lk(mu_);
socket_ = source; socket_ = source;
} }
absl::Cleanup cleanup([this]() { absl::Cleanup cleanup([this]() ABSL_LOCKS_EXCLUDED(mu_) {
std::lock_guard lk(mu_); util::fb2::LockGuard lk(mu_);
socket_ = nullptr; socket_ = nullptr;
}); });
JournalReader reader{source, 0}; JournalReader reader{source, 0};
@ -79,7 +81,7 @@ class ClusterShardMigration {
} }
std::error_code Cancel() { std::error_code Cancel() {
std::lock_guard lk(mu_); util::fb2::LockGuard lk(mu_);
if (socket_ != nullptr) { if (socket_ != nullptr) {
return socket_->proactor()->Await([s = socket_]() { return socket_->proactor()->Await([s = socket_]() {
if (s->IsOpen()) { if (s->IsOpen()) {
@ -114,7 +116,6 @@ class ClusterShardMigration {
} }
} }
private:
uint32_t source_shard_id_; uint32_t source_shard_id_;
util::fb2::Mutex mu_; util::fb2::Mutex mu_;
util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_); util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_);

View file

@ -364,6 +364,7 @@ bool OutgoingMigration::CheckFlowsForErrors() {
} }
size_t OutgoingMigration::GetKeyCount() const { size_t OutgoingMigration::GetKeyCount() const {
util::fb2::LockGuard lk(state_mu_);
if (state_ == MigrationState::C_FINISHED) { if (state_ == MigrationState::C_FINISHED) {
return keys_number_; return keys_number_;
} }

View file

@ -56,7 +56,7 @@ class OutgoingMigration : private ProtocolClient {
return last_error_.Format(); return last_error_.Format();
} }
size_t GetKeyCount() const ABSL_EXCLUSIVE_LOCKS_REQUIRED(state_mu_); size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_);
static constexpr long kInvalidAttempt = -1; static constexpr long kInvalidAttempt = -1;
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";

View file

@ -377,6 +377,30 @@ class ABSL_LOCKABLE ThreadLocalMutex {
util::fb2::detail::FiberInterface* locked_fiber_{nullptr}; util::fb2::detail::FiberInterface* locked_fiber_{nullptr};
}; };
// Replacement of std::SharedLock that allows -Wthread-safety
template <typename Mutex> class ABSL_SCOPED_LOCKABLE SharedLock {
public:
explicit SharedLock(Mutex& m) ABSL_EXCLUSIVE_LOCK_FUNCTION(m) : m_(m) {
m_.lock_shared();
is_locked_ = true;
}
~SharedLock() ABSL_UNLOCK_FUNCTION() {
if (is_locked_) {
m_.unlock_shared();
}
}
void unlock() ABSL_UNLOCK_FUNCTION() {
m_.unlock_shared();
is_locked_ = false;
}
private:
Mutex& m_;
bool is_locked_;
};
extern size_t serialization_max_chunk_size; extern size_t serialization_max_chunk_size;
} // namespace dfly } // namespace dfly

View file

@ -565,8 +565,7 @@ class DbSlice {
return version_++; return version_++;
} }
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;
ABSL_EXCLUSIVE_LOCKS_REQUIRED(local_mu_);
// Used to provide exclusive access while Traversing segments // Used to provide exclusive access while Traversing segments
mutable ThreadLocalMutex local_mu_; mutable ThreadLocalMutex local_mu_;

View file

@ -102,7 +102,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
} // namespace } // namespace
void DflyCmd::ReplicaInfo::Cancel() { void DflyCmd::ReplicaInfo::Cancel() {
lock_guard lk = GetExclusiveLock(); auto lk = GetExclusiveLock();
if (replica_state == SyncState::CANCELLED) { if (replica_state == SyncState::CANCELLED) {
return; return;
} }
@ -261,7 +261,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
string eof_token; string eof_token;
{ {
lock_guard lk = replica_ptr->GetExclusiveLock(); auto lk = replica_ptr->GetExclusiveLock();
if (replica_ptr->replica_state != SyncState::PREPARATION) if (replica_ptr->replica_state != SyncState::PREPARATION)
return cntx->SendError(kInvalidState); return cntx->SendError(kInvalidState);
@ -325,7 +325,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
if (!sync_id) if (!sync_id)
return; return;
lock_guard lk = replica_ptr->GetExclusiveLock(); auto lk = replica_ptr->GetExclusiveLock();
if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::PREPARATION, rb)) if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::PREPARATION, rb))
return; return;
@ -364,7 +364,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
if (!sync_id) if (!sync_id)
return; return;
lock_guard lk = replica_ptr->GetExclusiveLock(); auto lk = replica_ptr->GetExclusiveLock();
if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::FULL_SYNC, rb)) if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::FULL_SYNC, rb))
return; return;
@ -418,7 +418,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
return; return;
{ {
shared_lock lk = replica_ptr->GetSharedLock(); auto lk = replica_ptr->GetSharedLock();
if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::STABLE_SYNC, rb)) if (!CheckReplicaStateOrReply(*replica_ptr, SyncState::STABLE_SYNC, rb))
return; return;
@ -467,7 +467,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
atomic_bool catchup_success = true; atomic_bool catchup_success = true;
if (*status == OpStatus::OK) { if (*status == OpStatus::OK) {
shared_lock lk = replica_ptr->GetSharedLock(); auto lk = replica_ptr->GetSharedLock();
auto cb = [replica_ptr = std::move(replica_ptr), end_time, auto cb = [replica_ptr = std::move(replica_ptr), end_time,
&catchup_success](EngineShard* shard) { &catchup_success](EngineShard* shard) {
if (!WaitReplicaFlowToCatchup(end_time, replica_ptr.get(), shard)) { if (!WaitReplicaFlowToCatchup(end_time, replica_ptr.get(), shard)) {
@ -710,6 +710,7 @@ void DflyCmd::StopReplication(uint32_t sync_id) {
replica_infos_.erase(sync_id); replica_infos_.erase(sync_id);
} }
// Because we need to annotate unique_lock
void DflyCmd::BreakStalledFlowsInShard() { void DflyCmd::BreakStalledFlowsInShard() {
std::unique_lock global_lock(mu_, try_to_lock); std::unique_lock global_lock(mu_, try_to_lock);
@ -722,7 +723,7 @@ void DflyCmd::BreakStalledFlowsInShard() {
vector<uint32_t> deleted; vector<uint32_t> deleted;
for (auto [sync_id, replica_ptr] : replica_infos_) { for (auto [sync_id, replica_ptr] : replica_infos_) {
shared_lock replica_lock = replica_ptr->GetSharedLock(); auto replica_lock = replica_ptr->GetSharedLock();
if (!replica_ptr->flows[sid].saver) if (!replica_ptr->flows[sid].saver)
continue; continue;
@ -787,9 +788,9 @@ void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {
{ {
util::fb2::LockGuard lk{mu_}; // prevent state changes util::fb2::LockGuard lk{mu_}; // prevent state changes
auto cb = [&](EngineShard* shard) { auto cb = [&](EngineShard* shard) ABSL_NO_THREAD_SAFETY_ANALYSIS {
for (const auto& [_, info] : replica_infos_) { for (const auto& [_, info] : replica_infos_) {
shared_lock repl_lk = info->GetSharedLock(); auto repl_lk = info->GetSharedLock();
// flows should not be empty. // flows should not be empty.
DCHECK(!info->flows.empty()); DCHECK(!info->flows.empty());
@ -836,7 +837,7 @@ std::map<uint32_t, LSN> DflyCmd::ReplicationLagsLocked() const {
std::vector<std::map<uint32_t, LSN>> shard_lags(shard_set->size()); std::vector<std::map<uint32_t, LSN>> shard_lags(shard_set->size());
shard_set->RunBriefInParallel([&shard_lags, this](EngineShard* shard) { shard_set->RunBriefInParallel([&shard_lags, this](EngineShard* shard) {
auto& lags = shard_lags[shard->shard_id()]; auto& lags = shard_lags[shard->shard_id()];
for (const auto& info : replica_infos_) { for (const auto& info : ABSL_TS_UNCHECKED_READ(replica_infos_)) {
const ReplicaInfo* replica = info.second.get(); const ReplicaInfo* replica = info.second.get();
if (shard->journal()) { if (shard->journal()) {
int64_t lag = shard->journal()->GetLsn() - replica->flows[shard->shard_id()].last_acked_lsn; int64_t lag = shard->journal()->GetLsn() - replica->flows[shard->shard_id()].last_acked_lsn;

View file

@ -10,6 +10,7 @@
#include <memory> #include <memory>
#include "server/conn_context.h" #include "server/conn_context.h"
#include "util/fibers/synchronization.h"
namespace facade { namespace facade {
class RedisReplyBuilder; class RedisReplyBuilder;
@ -101,7 +102,7 @@ class DflyCmd {
enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED }; enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };
// Stores information related to a single replica. // Stores information related to a single replica.
struct ReplicaInfo { struct ABSL_LOCKABLE ReplicaInfo {
ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port, ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port,
Context::ErrHandler err_handler) Context::ErrHandler err_handler)
: replica_state{SyncState::PREPARATION}, : replica_state{SyncState::PREPARATION},
@ -111,12 +112,12 @@ class DflyCmd {
flows{flow_count} { flows{flow_count} {
} }
[[nodiscard]] auto GetExclusiveLock() { [[nodiscard]] auto GetExclusiveLock() ABSL_EXCLUSIVE_LOCK_FUNCTION() {
return std::lock_guard{shared_mu}; return util::fb2::LockGuard{shared_mu};
} }
[[nodiscard]] auto GetSharedLock() { [[nodiscard]] auto GetSharedLock() ABSL_EXCLUSIVE_LOCK_FUNCTION() {
return std::shared_lock{shared_mu}; return dfly::SharedLock{shared_mu};
} }
// Transition into cancelled state, run cleanup. // Transition into cancelled state, run cleanup.
@ -156,13 +157,13 @@ class DflyCmd {
// Master-side command. Provides Replica info. // Master-side command. Provides Replica info.
std::vector<ReplicaRoleInfo> GetReplicasRoleInfo() const ABSL_LOCKS_EXCLUDED(mu_); std::vector<ReplicaRoleInfo> GetReplicasRoleInfo() const ABSL_LOCKS_EXCLUDED(mu_);
void GetReplicationMemoryStats(ReplicationMemoryStats* out) const ABSL_LOCKS_EXCLUDED(mu_); void GetReplicationMemoryStats(ReplicationMemoryStats* out) const ABSL_NO_THREAD_SAFETY_ANALYSIS;
// Sets metadata. // Sets metadata.
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version); void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);
// Tries to break those flows that stuck on socket write for too long time. // Tries to break those flows that stuck on socket write for too long time.
void BreakStalledFlowsInShard(); void BreakStalledFlowsInShard() ABSL_NO_THREAD_SAFETY_ANALYSIS;
private: private:
// JOURNAL [START/STOP] // JOURNAL [START/STOP]

View file

@ -109,17 +109,17 @@ class RoundRobinSharder {
std::fill(round_robin_shards_tl_cache_.begin(), round_robin_shards_tl_cache_.end(), std::fill(round_robin_shards_tl_cache_.begin(), round_robin_shards_tl_cache_.end(),
kInvalidSid); kInvalidSid);
std::lock_guard guard(mutex_); util::fb2::LockGuard guard(mutex_);
if (round_robin_shards_.empty()) { if (round_robin_shards_.empty()) {
round_robin_shards_ = round_robin_shards_tl_cache_; round_robin_shards_ = round_robin_shards_tl_cache_;
} }
} }
} }
static void Destroy() { static void Destroy() ABSL_LOCKS_EXCLUDED(mutex_) {
round_robin_shards_tl_cache_.clear(); round_robin_shards_tl_cache_.clear();
std::lock_guard guard(mutex_); util::fb2::LockGuard guard(mutex_);
round_robin_shards_.clear(); round_robin_shards_.clear();
} }
@ -138,7 +138,7 @@ class RoundRobinSharder {
ShardId sid = round_robin_shards_tl_cache_[index]; ShardId sid = round_robin_shards_tl_cache_[index];
if (sid == kInvalidSid) { if (sid == kInvalidSid) {
std::lock_guard guard(mutex_); util::fb2::LockGuard guard(mutex_);
sid = round_robin_shards_[index]; sid = round_robin_shards_[index];
if (sid == kInvalidSid) { if (sid == kInvalidSid) {
sid = next_shard_; sid = next_shard_;

View file

@ -2496,22 +2496,30 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
} }
void Service::RequestLoadingState() { void Service::RequestLoadingState() {
std::unique_lock lk(mu_); bool switch_state = false;
++loading_state_counter_; {
if (global_state_ != GlobalState::LOADING) { util::fb2::LockGuard lk(mu_);
DCHECK_EQ(global_state_, GlobalState::ACTIVE); ++loading_state_counter_;
lk.unlock(); if (global_state_ != GlobalState::LOADING) {
DCHECK_EQ(global_state_, GlobalState::ACTIVE);
switch_state = true;
}
}
if (switch_state) {
SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
} }
} }
void Service::RemoveLoadingState() { void Service::RemoveLoadingState() {
unique_lock lk(mu_); bool switch_state = false;
DCHECK_EQ(global_state_, GlobalState::LOADING); {
DCHECK_GT(loading_state_counter_, 0u); util::fb2::LockGuard lk(mu_);
--loading_state_counter_; DCHECK_EQ(global_state_, GlobalState::LOADING);
if (loading_state_counter_ == 0) { DCHECK_GT(loading_state_counter_, 0u);
lk.unlock(); --loading_state_counter_;
switch_state = loading_state_counter_ == 0;
}
if (switch_state) {
SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
} }
} }

View file

@ -79,8 +79,8 @@ class Service : public facade::ServiceInterface {
// Upon switch, updates cached global state in threadlocal ServerState struct. // Upon switch, updates cached global state in threadlocal ServerState struct.
GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_); GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_);
void RequestLoadingState(); void RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_);
void RemoveLoadingState(); void RemoveLoadingState() ABSL_LOCKS_EXCLUDED(mu_);
GlobalState GetGlobalState() const ABSL_LOCKS_EXCLUDED(mu_); GlobalState GetGlobalState() const ABSL_LOCKS_EXCLUDED(mu_);

View file

@ -2,6 +2,7 @@
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "server/common.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
ABSL_DECLARE_FLAG(bool, cache_mode); ABSL_DECLARE_FLAG(bool, cache_mode);
@ -62,7 +63,7 @@ bool Namespaces::IsInitialized() const {
void Namespaces::Clear() { void Namespaces::Clear() {
util::fb2::LockGuard guard(mu_); util::fb2::LockGuard guard(mu_);
namespaces.default_namespace_ = nullptr; default_namespace_ = nullptr;
if (namespaces_.empty()) { if (namespaces_.empty()) {
return; return;
@ -70,12 +71,12 @@ void Namespaces::Clear() {
shard_set->RunBriefInParallel([&](EngineShard* es) { shard_set->RunBriefInParallel([&](EngineShard* es) {
CHECK(es != nullptr); CHECK(es != nullptr);
for (auto& ns : namespaces_) { for (auto& ns : ABSL_TS_UNCHECKED_READ(namespaces_)) {
ns.second.shard_db_slices_[es->shard_id()].reset(); ns.second.shard_db_slices_[es->shard_id()].reset();
} }
}); });
namespaces.namespaces_.clear(); namespaces_.clear();
} }
Namespace& Namespaces::GetDefaultNamespace() const { Namespace& Namespaces::GetDefaultNamespace() const {
@ -86,7 +87,7 @@ Namespace& Namespaces::GetDefaultNamespace() const {
Namespace& Namespaces::GetOrInsert(std::string_view ns) { Namespace& Namespaces::GetOrInsert(std::string_view ns) {
{ {
// Try to look up under a shared lock // Try to look up under a shared lock
std::shared_lock guard(mu_); dfly::SharedLock guard(mu_);
auto it = namespaces_.find(ns); auto it = namespaces_.find(ns);
if (it != namespaces_.end()) { if (it != namespaces_.end()) {
return it->second; return it->second;

View file

@ -871,7 +871,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
void ServerFamily::LoadFromSnapshot() { void ServerFamily::LoadFromSnapshot() {
{ {
std::lock_guard lk{loading_stats_mu_}; util::fb2::LockGuard lk{loading_stats_mu_};
loading_stats_.restore_count++; loading_stats_.restore_count++;
} }
@ -887,7 +887,7 @@ void ServerFamily::LoadFromSnapshot() {
if (std::error_code(load_path_result.error()) == std::errc::no_such_file_or_directory) { if (std::error_code(load_path_result.error()) == std::errc::no_such_file_or_directory) {
LOG(WARNING) << "Load snapshot: No snapshot found"; LOG(WARNING) << "Load snapshot: No snapshot found";
} else { } else {
std::lock_guard lk{loading_stats_mu_}; util::fb2::LockGuard lk{loading_stats_mu_};
loading_stats_.failed_restore_count++; loading_stats_.failed_restore_count++;
LOG(ERROR) << "Failed to load snapshot: " << load_path_result.error().Format(); LOG(ERROR) << "Failed to load snapshot: " << load_path_result.error().Format();
} }
@ -900,7 +900,7 @@ void ServerFamily::JoinSnapshotSchedule() {
schedule_done_.Reset(); schedule_done_.Reset();
} }
void ServerFamily::Shutdown() ABSL_LOCKS_EXCLUDED(replicaof_mu_) { void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown"; VLOG(1) << "ServerFamily::Shutdown";
if (load_result_) { if (load_result_) {
@ -912,10 +912,10 @@ void ServerFamily::Shutdown() ABSL_LOCKS_EXCLUDED(replicaof_mu_) {
bg_save_fb_.JoinIfNeeded(); bg_save_fb_.JoinIfNeeded();
if (save_on_shutdown_ && !absl::GetFlag(FLAGS_dbfilename).empty()) { if (save_on_shutdown_ && !absl::GetFlag(FLAGS_dbfilename).empty()) {
shard_set->pool()->GetNextProactor()->Await([this] { shard_set->pool()->GetNextProactor()->Await([this]() ABSL_LOCKS_EXCLUDED(loading_stats_mu_) {
GenericError ec = DoSave(); GenericError ec = DoSave();
std::lock_guard lk{loading_stats_mu_}; util::fb2::LockGuard lk{loading_stats_mu_};
loading_stats_.backup_count++; loading_stats_.backup_count++;
if (ec) { if (ec) {
@ -1075,7 +1075,7 @@ void ServerFamily::SnapshotScheduling() {
GenericError ec = DoSave(); GenericError ec = DoSave();
std::lock_guard lk{loading_stats_mu_}; util::fb2::LockGuard lk{loading_stats_mu_};
loading_stats_.backup_count++; loading_stats_.backup_count++;
if (ec) { if (ec) {
@ -1559,7 +1559,7 @@ GenericError ServerFamily::WaitUntilSaveFinished(Transaction* trans, bool ignore
detail::SaveInfo save_info; detail::SaveInfo save_info;
{ {
std::lock_guard lk(save_mu_); util::fb2::LockGuard lk(save_mu_);
save_info = save_controller_->Finalize(); save_info = save_controller_->Finalize();
if (save_info.error) { if (save_info.error) {
@ -2062,13 +2062,13 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
} }
{ {
std::lock_guard lk{loading_stats_mu_}; util::fb2::LockGuard lk{loading_stats_mu_};
result.loading_stats = loading_stats_; result.loading_stats = loading_stats_;
} }
// Update peak stats. We rely on the fact that GetMetrics is called frequently enough to // Update peak stats. We rely on the fact that GetMetrics is called frequently enough to
// update peak_stats_ from it. // update peak_stats_ from it.
lock_guard lk{peak_stats_mu_}; util::fb2::LockGuard lk{peak_stats_mu_};
UpdateMax(&peak_stats_.conn_dispatch_queue_bytes, UpdateMax(&peak_stats_.conn_dispatch_queue_bytes,
result.facade_stats.conn_stats.dispatch_queue_bytes); result.facade_stats.conn_stats.dispatch_queue_bytes);
UpdateMax(&peak_stats_.conn_read_buf_capacity, result.facade_stats.conn_stats.read_buf_capacity); UpdateMax(&peak_stats_.conn_read_buf_capacity, result.facade_stats.conn_stats.read_buf_capacity);

View file

@ -260,7 +260,7 @@ class ServerFamily {
private: private:
void JoinSnapshotSchedule(); void JoinSnapshotSchedule();
void LoadFromSnapshot(); void LoadFromSnapshot() ABSL_LOCKS_EXCLUDED(loading_stats_mu_);
uint32_t shard_count() const { uint32_t shard_count() const {
return shard_set->size(); return shard_set->size();
@ -305,7 +305,7 @@ class ServerFamily {
// Returns the number of loaded keys if successful. // Returns the number of loaded keys if successful.
io::Result<size_t> LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys); io::Result<size_t> LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys);
void SnapshotScheduling(); void SnapshotScheduling() ABSL_LOCKS_EXCLUDED(loading_stats_mu_);
void SendInvalidationMessages() const; void SendInvalidationMessages() const;
@ -319,8 +319,8 @@ class ServerFamily {
GenericError DoSaveCheckAndStart(bool new_version, string_view basename, Transaction* trans, GenericError DoSaveCheckAndStart(bool new_version, string_view basename, Transaction* trans,
bool ignore_state = false) ABSL_LOCKS_EXCLUDED(save_mu_); bool ignore_state = false) ABSL_LOCKS_EXCLUDED(save_mu_);
GenericError WaitUntilSaveFinished(Transaction* trans, bool ignore_state = false) GenericError WaitUntilSaveFinished(Transaction* trans,
ABSL_LOCKS_EXCLUDED(save_mu_); bool ignore_state = false) ABSL_NO_THREAD_SAFETY_ANALYSIS;
void StopAllClusterReplicas() ABSL_EXCLUSIVE_LOCKS_REQUIRED(replicaof_mu_); void StopAllClusterReplicas() ABSL_EXCLUSIVE_LOCKS_REQUIRED(replicaof_mu_);
bool DoAuth(ConnectionContext* cntx, std::string_view username, std::string_view password) const; bool DoAuth(ConnectionContext* cntx, std::string_view username, std::string_view password) const;