1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: remove core/fibers (#2723)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-03-14 14:02:33 +02:00 committed by GitHub
parent 094df3ef3f
commit 7e0536fd4c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
41 changed files with 74 additions and 86 deletions

View file

@ -1,20 +0,0 @@
// Copyright 2023, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
// An import header that centralizes all the imports from helio project regarding fibers
#include "util/fibers/fiber2.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/future.h"
#include "util/fibers/simple_channel.h"
namespace dfly {
using util::fb2::Fiber;
using util::fb2::Launch;
using util::fb2::Mutex;
using util::fb2::SimpleChannel;
} // namespace dfly

View file

@ -6,7 +6,7 @@
#include <atomic>
#include "core/fibers.h"
#include "util/fibers/simple_channel.h"
namespace dfly {
@ -68,7 +68,7 @@ template <typename T, typename Queue = folly::ProducerConsumerQueue<T>> class Si
}
private:
SimpleChannel<T, Queue> queue_;
util::fb2::SimpleChannel<T, Queue> queue_;
std::atomic<size_t> size_ = 0;
};

View file

@ -1468,7 +1468,7 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) {
void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable()) {
dispatch_fb_ = fb2::Fiber(dfly::Launch::post, "connection_dispatch",
dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch",
[&, peer = socket_.get()]() { DispatchFiber(peer); });
}
}

View file

@ -15,12 +15,12 @@
#include <variant>
#include "base/io_buf.h"
#include "core/fibers.h"
#include "facade/acl_commands_def.h"
#include "facade/facade_types.h"
#include "facade/memcache_parser.h"
#include "facade/resp_expr.h"
#include "util/connection.h"
#include "util/fibers/fibers.h"
#include "util/http/http_handler.h"
typedef struct ssl_ctx_st SSL_CTX;

View file

@ -12,6 +12,7 @@
#include <memory>
#include <numeric>
#include <optional>
#include <random>
#include <string>
#include <utility>
#include <variant>

View file

@ -8,7 +8,6 @@
#include <mutex>
#include "base/flags.h"
#include "core/fibers.h"
#include "facade/facade_types.h"
#include "server/acl/acl_commands_def.h"

View file

@ -100,7 +100,7 @@ class ChannelStore {
// Centralized controller to prevent overlaping updates.
struct ControlBlock {
std::atomic<ChannelStore*> most_recent;
Mutex update_mu; // locked during updates.
util::fb2::Mutex update_mu; // locked during updates.
};
private:

View file

@ -36,6 +36,8 @@ namespace {
using namespace std;
using namespace facade;
using namespace util;
using CI = CommandId;
using ClusterShard = ClusterConfig::ClusterShard;
using ClusterShards = ClusterConfig::ClusterShards;
@ -423,7 +425,7 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
namespace {
// Guards set configuration, so that we won't handle 2 in parallel.
Mutex set_config_mu;
util::fb2::Mutex set_config_mu;
void DeleteSlots(const SlotSet& slots) {
if (slots.Empty()) {
@ -557,7 +559,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
if (auto err = parser.Error(); err)
return rb->SendError(err->MakeReply());
Mutex mu;
fb2::Mutex mu;
auto cb = [&](auto*) {
EngineShard* shard = EngineShard::tlocal();

View file

@ -84,7 +84,7 @@ class ClusterFamily {
std::shared_ptr<OutgoingMigration> GetOutgoingMigration(uint32_t sync_id);
mutable Mutex migration_mu_; // guard migrations operations
mutable util::fb2::Mutex migration_mu_; // guard migrations operations
// holds all incoming slots migrations that are currently in progress.
std::vector<std::unique_ptr<ClusterSlotMigration>> incoming_migrations_jobs_
ABSL_GUARDED_BY(migration_mu_);

View file

@ -48,7 +48,7 @@ class ClusterShardMigration : public ProtocolClient {
uint32_t sync_id_;
std::optional<base::IoBuf> leftover_buf_;
std::unique_ptr<JournalExecutor> executor_;
Fiber sync_fb_;
util::fb2::Fiber sync_fb_;
std::atomic_bool is_stable_sync_ = false;
bool is_finalized_ = false;
};

View file

@ -61,7 +61,7 @@ class ClusterSlotMigration : private ProtocolClient {
private:
ClusterFamily* cluster_family_;
Service& service_;
Mutex flows_op_mu_;
util::fb2::Mutex flows_op_mu_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
uint32_t source_shards_num_ = 0;
@ -69,7 +69,7 @@ class ClusterSlotMigration : private ProtocolClient {
uint32_t local_sync_id_ = 0;
MigrationState state_ = MigrationState::C_NO_STATE;
Fiber sync_fb_;
util::fb2::Fiber sync_fb_;
};
} // namespace dfly

View file

@ -14,6 +14,8 @@
#include "server/server_family.h"
using namespace std;
using namespace util;
namespace dfly {
class OutgoingMigration::SliceSlotMigration {
@ -22,7 +24,7 @@ class OutgoingMigration::SliceSlotMigration {
Context* cntx, io::Sink* dest)
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
state_.store(MigrationState::C_SYNC, memory_order_relaxed);
sync_fb_ = Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); });
sync_fb_ = fb2::Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); });
}
void Cancel() {
@ -46,7 +48,7 @@ class OutgoingMigration::SliceSlotMigration {
RestoreStreamer streamer_;
// Atomic only for simple read operation, writes - from the same thread, reads - from any thread
atomic<MigrationState> state_ = MigrationState::C_CONNECTING;
Fiber sync_fb_;
fb2::Fiber sync_fb_;
};
OutgoingMigration::OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots,
@ -78,7 +80,7 @@ void OutgoingMigration::StartFlow(uint32_t sync_id, journal::Journal* journal, i
}
if (state == MigrationState::C_SYNC) {
main_sync_fb_ = Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
}
}

View file

@ -56,11 +56,11 @@ class OutgoingMigration {
uint16_t port_;
SlotRanges slots_;
Context cntx_;
mutable Mutex flows_mu_;
mutable util::fb2::Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);
ServerFamily* server_family_;
Fiber main_sync_fb_;
util::fb2::Fiber main_sync_fb_;
};
} // namespace dfly

View file

@ -14,9 +14,10 @@
#include <string_view>
#include <vector>
#include "core/fibers.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "util/fibers/fibers.h"
#include "util/fibers/synchronization.h"
namespace dfly {
@ -239,7 +240,7 @@ template <typename T> struct AggregateValue {
}
private:
Mutex mu_{};
util::fb2::Mutex mu_{};
T current_{};
};
@ -343,10 +344,10 @@ class Context : protected Cancellation {
private:
GenericError err_;
Mutex mu_;
util::fb2::Mutex mu_;
ErrHandler err_handler_;
Fiber err_handler_fb_;
util::fb2::Fiber err_handler_fb_;
};
struct ScanOpts {

View file

@ -8,7 +8,6 @@
#include <absl/container/flat_hash_set.h>
#include "acl/acl_commands_def.h"
#include "core/fibers.h"
#include "facade/conn_context.h"
#include "facade/reply_capture.h"
#include "server/common.h"

View file

@ -675,7 +675,7 @@ void DebugCmd::Populate(CmdArgList args) {
}
ranges.emplace_back(from, options->total_count - from);
vector<Fiber> fb_arr(ranges.size());
vector<fb2::Fiber> fb_arr(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {
auto range = ranges[i];
@ -844,7 +844,7 @@ void DebugCmd::Inspect(string_view key, CmdArgList args) {
}
void DebugCmd::Watched() {
Mutex mu;
fb2::Mutex mu;
vector<string> watched_keys;
vector<string> awaked_trans;

View file

@ -133,7 +133,7 @@ struct SaveStagesController : public SaveStagesInputs {
std::vector<std::pair<std::unique_ptr<RdbSnapshot>, std::filesystem::path>> snapshots_;
absl::flat_hash_map<string_view, size_t> rdb_name_map_;
Mutex rdb_name_map_mu_;
util::fb2::Mutex rdb_name_map_mu_;
};
GenericError ValidateFilename(const std::filesystem::path& filename, bool new_version);

View file

@ -15,7 +15,6 @@
#include "base/init.h"
#include "base/io_buf.h"
#include "base/zipf_gen.h"
#include "core/fibers.h"
#include "facade/redis_parser.h"
#include "util/fibers/dns_resolve.h"
#include "util/fibers/pool.h"
@ -367,7 +366,7 @@ int main(int argc, char* argv[]) {
absl::Duration duration = absl::Now() - start_time;
LOG(INFO) << "Finished. Total time: " << duration;
dfly::Mutex mutex;
fb2::Mutex mutex;
base::Histogram hist;
LOG(INFO) << "Resetting all threads";
pp->AwaitFiberOnAll([&](auto* p) {

View file

@ -665,7 +665,7 @@ std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() const {
}
void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {
Mutex stats_mu;
util::fb2::Mutex stats_mu;
lock_guard lk_main{mu_}; // prevent state changes
auto cb = [this, &stats, &stats_mu](EngineShard* shard) {

View file

@ -38,7 +38,7 @@ struct FlowInfo {
facade::Connection* conn = nullptr;
Fiber full_sync_fb; // Full sync fiber.
util::fb2::Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver for full sync phase.
std::unique_ptr<JournalStreamer> streamer; // Streamer for stable sync phase
std::string eof_token;
@ -121,7 +121,7 @@ class DflyCmd {
// Flows describe the state of shard-local flow.
// They are always indexed by the shard index on the master.
std::vector<FlowInfo> flows;
Mutex mu; // See top of header for locking levels.
util::fb2::Mutex mu; // See top of header for locking levels.
};
public:
@ -223,7 +223,7 @@ class DflyCmd {
using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;
mutable Mutex mu_; // Guard global operations. See header top for locking levels.
mutable util::fb2::Mutex mu_; // Guard global operations. See header top for locking levels.
};
} // namespace dfly

View file

@ -33,6 +33,7 @@ using namespace std;
using namespace util;
using absl::SetFlag;
using absl::StrCat;
using fb2::Fiber;
using ::io::Result;
using testing::ElementsAre;
using testing::HasSubstr;

View file

@ -164,7 +164,7 @@ class RoundRobinSharder {
static thread_local vector<ShardId> round_robin_shards_tl_cache_;
static vector<ShardId> round_robin_shards_ ABSL_GUARDED_BY(mutex_);
static ShardId next_shard_ ABSL_GUARDED_BY(mutex_);
static Mutex mutex_;
static fb2::Mutex mutex_;
};
bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) {
@ -193,7 +193,7 @@ thread_local string RoundRobinSharder::round_robin_prefix_;
thread_local vector<ShardId> RoundRobinSharder::round_robin_shards_tl_cache_;
vector<ShardId> RoundRobinSharder::round_robin_shards_;
ShardId RoundRobinSharder::next_shard_;
Mutex RoundRobinSharder::mutex_;
fb2::Mutex RoundRobinSharder::mutex_;
} // namespace

View file

@ -243,7 +243,7 @@ class EngineShard {
IntentLock shard_lock_;
uint32_t defrag_task_ = 0;
Fiber fiber_periodic_;
util::fb2::Fiber fiber_periodic_;
util::fb2::Done fiber_periodic_done_;
DefragTaskState defrag_state_;

View file

@ -3,7 +3,6 @@
//
#include "base/io_buf.h"
#include "core/fibers.h"
#include "io/io.h"
#include "server/common.h"

View file

@ -60,7 +60,7 @@ error_code Journal::Close() {
VLOG(1) << "Journal::Close";
Mutex ec_mu;
fb2::Mutex ec_mu;
error_code res;
lock_guard lk(state_mu_);

View file

@ -61,7 +61,7 @@ class Journal {
std::optional<SlotId> slot, Entry::Payload payload, bool await);
private:
mutable Mutex state_mu_;
mutable util::fb2::Mutex state_mu_;
std::atomic_bool lameduck_{false};
};

View file

@ -46,7 +46,7 @@ class JournalStreamer : protected BufferedStreamerBase {
uint32_t journal_cb_id_{0};
journal::Journal* journal_;
Fiber write_fb_{};
util::fb2::Fiber write_fb_{};
};
// Serializes existing DB as RESTORE commands, and sends updates as regular commands.

View file

@ -31,7 +31,7 @@ class MultiShardExecution {
void CancelAllBlockingEntities();
private:
Mutex map_mu;
util::fb2::Mutex map_mu;
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
};

View file

@ -185,7 +185,7 @@ class Service : public facade::ServiceInterface {
const CommandId* exec_cid_; // command id of EXEC command for pipeline squashing
mutable Mutex mu_;
mutable util::fb2::Mutex mu_;
GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_;
};

View file

@ -1,7 +1,12 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/multi_command_squasher.h"
#include <absl/container/inlined_vector.h>
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "server/cluster/unique_slot_checker.h"
#include "server/command_registry.h"

View file

@ -4,8 +4,6 @@
#pragma once
#include "base/logging.h"
#include "core/fibers.h"
#include "facade/reply_capture.h"
#include "server/conn_context.h"
#include "server/main_service.h"

View file

@ -124,7 +124,7 @@ class ProtocolClient {
base::IoBuf resp_buf_;
std::unique_ptr<util::FiberSocketBase> sock_;
Mutex sock_mu_;
util::fb2::Mutex sock_mu_;
protected:
Context cntx_; // context for tasks in replica.

View file

@ -146,8 +146,8 @@ class Replica : ProtocolClient {
MasterContext master_context_;
// In redis replication mode.
Fiber sync_fb_;
Fiber acks_fb_;
util::fb2::Fiber sync_fb_;
util::fb2::Fiber acks_fb_;
util::fb2::EventCount waker_;
std::vector<std::unique_ptr<DflyShardReplica>> shard_flows_;
@ -157,7 +157,7 @@ class Replica : ProtocolClient {
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
// Guard operations where flows might be in a mixed state (transition/setup)
Mutex flows_op_mu_;
util::fb2::Mutex flows_op_mu_;
// repl_offs - till what offset we've already read from the master.
// ack_offs_ last acknowledged offset.
@ -228,13 +228,13 @@ class DflyShardReplica : public ProtocolClient {
// run out-of-order on the master instance.
std::atomic_uint64_t journal_rec_executed_ = 0;
Fiber sync_fb_;
util::fb2::Fiber sync_fb_;
Fiber acks_fb_;
util::fb2::Fiber acks_fb_;
size_t ack_offs_ = 0;
bool force_ping_ = false;
Fiber execution_fb_;
util::fb2::Fiber execution_fb_;
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
uint32_t flow_id_ = UINT32_MAX; // Flow id if replica acts as a dfly flow.

View file

@ -183,7 +183,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const {
void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
absl::flat_hash_map<std::string, base::Histogram> result;
Mutex mu;
fb2::Mutex mu;
shard_set->pool()->AwaitFiberOnAll([&](auto* pb) {
auto* ss = ServerState::tlocal();

View file

@ -78,7 +78,7 @@ class ScriptMgr {
ScriptParams default_params_;
absl::flat_hash_map<ScriptKey, InternalScriptData> db_;
mutable Mutex mu_;
mutable util::fb2::Mutex mu_;
};
} // namespace dfly

View file

@ -839,7 +839,7 @@ fb2::Future<GenericError> ServerFamily::Load(const std::string& load_path) {
auto& pool = service_.proactor_pool();
vector<Fiber> load_fibers;
vector<fb2::Fiber> load_fibers;
load_fibers.reserve(paths.size());
auto aggregated_result = std::make_shared<AggregateLoadResult>();
@ -1755,7 +1755,7 @@ void ServerFamily::ResetStat() {
Metrics ServerFamily::GetMetrics() const {
Metrics result;
Mutex mu;
util::fb2::Mutex mu;
auto cmd_stat_cb = [&dest = result.cmd_stats_map](string_view name, const CmdCallStats& stat) {
auto& [calls, sum] = dest[absl::AsciiStrToLower(name)];

View file

@ -17,6 +17,7 @@
#include "server/replica.h"
#include "server/server_state.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/future.h"
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
std::string_view sub_cmd);
@ -278,7 +279,7 @@ class ServerFamily {
GenericError WaitUntilSaveFinished(Transaction* trans, bool ignore_state = false);
Fiber snapshot_schedule_fb_;
util::fb2::Fiber snapshot_schedule_fb_;
util::fb2::Future<GenericError> load_result_;
uint32_t stats_caching_task_ = 0;
@ -288,7 +289,7 @@ class ServerFamily {
std::vector<facade::Listener*> listeners_;
util::ProactorBase* pb_task_ = nullptr;
mutable Mutex replicaof_mu_, save_mu_;
mutable util::fb2::Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_ ABSL_GUARDED_BY(replicaof_mu_);
std::unique_ptr<ScriptMgr> script_mgr_;

View file

@ -144,7 +144,7 @@ class SliceSnapshot {
// Used for sanity checks.
bool serialize_bucket_running_ = false;
Fiber snapshot_fb_; // IterateEntriesFb
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
CompressionMode compression_mode_;
RdbTypeFreqMap type_freq_map_;

View file

@ -637,7 +637,7 @@ vector<string> BaseFamilyTest::StrArray(const RespExpr& expr) {
}
absl::flat_hash_set<string> BaseFamilyTest::GetLastUsedKeys() {
Mutex mu;
fb2::Mutex mu;
absl::flat_hash_set<string> result;
auto add_keys = [&](ProactorBase* proactor) {
@ -671,14 +671,15 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function<bool()>& c
<< "Timeout of " << timeout << " reached when expecting condition";
}
Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function<bool()>& condition) {
fb2::Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function<bool()>& condition) {
TransactionSuspension tx;
pp_->at(0)->Await([&] { tx.Start(); });
auto fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [condition, tx = std::move(tx)]() mutable {
ExpectConditionWithinTimeout(condition);
tx.Terminate();
});
auto fb =
pp_->at(0)->LaunchFiber(fb2::Launch::dispatch, [condition, tx = std::move(tx)]() mutable {
ExpectConditionWithinTimeout(condition);
tx.Terminate();
});
return fb;
}

View file

@ -17,7 +17,8 @@
namespace dfly {
using namespace facade;
using namespace std;
using util::fb2::Fiber;
using util::fb2::Launch;
class TestConnection : public facade::Connection {
public:
@ -143,7 +144,7 @@ class BaseFamilyTest : public ::testing::Test {
static absl::flat_hash_set<std::string> GetLastUsedKeys();
static void ExpectConditionWithinTimeout(const std::function<bool()>& condition,
absl::Duration timeout = absl::Seconds(10));
Fiber ExpectConditionWithSuspension(const std::function<bool()>& condition);
util::fb2::Fiber ExpectConditionWithSuspension(const std::function<bool()>& condition);
static unsigned NumLocked();
@ -156,7 +157,7 @@ class BaseFamilyTest : public ::testing::Test {
unsigned num_threads_ = 3;
absl::flat_hash_map<std::string, std::unique_ptr<TestConnWrapper>> connections_;
Mutex mu_;
util::fb2::Mutex mu_;
ConnectionContext::DebugInfo last_cmd_dbg_info_;
std::vector<RespVec*> resp_vec_;

View file

@ -8,7 +8,6 @@
#include <absl/container/flat_hash_map.h>
#include "core/external_alloc.h"
#include "core/fibers.h"
#include "server/common.h"
#include "server/io_mgr.h"
#include "server/table.h"