1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Refactor server commands into server_family.

Introduce FlushDb. Minor cleanups and renamings.
This commit is contained in:
Roman Gershman 2022-01-08 00:01:35 +02:00
parent 76286e33bf
commit e7071b73b1
11 changed files with 344 additions and 85 deletions

View file

@ -9,7 +9,6 @@ namespace dfly {
// SHARED - can be acquired multiple times as long as other intents are absent.
// EXCLUSIVE - is acquired only if it's the only lock recorded.
// BLOCKED_READY - can not be acquired - it's recorded for intent purposes.
// Transactions at the head of tx-queue are considered to be the ones that acquired the lock
class IntentLock {
public:

View file

@ -5,7 +5,7 @@ add_library(dragonfly_lib command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc
dragonfly_connection.cc engine_shard_set.cc generic_family.cc
list_family.cc main_service.cc memcache_parser.cc
redis_parser.cc reply_builder.cc string_family.cc transaction.cc)
redis_parser.cc reply_builder.cc server_family.cc string_family.cc transaction.cc)
cxx_link(dragonfly_lib dfly_core redis_lib uring_fiber_lib
fibers_ext strings_lib http_server_lib tls_lib)
@ -21,5 +21,5 @@ cxx_test(generic_family_test dfly_test_lib LABELS DFLY)
cxx_test(memcache_parser_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly DEPENDS COMMAND ctest -L DFLY)
add_dependencies(check_dfly redis_parser_test list_family_test string_family_test
generic_family_test memcache_parser_test)
add_dependencies(check_dfly dragonfly_test list_family_test
generic_family_test memcache_parser_test redis_parser_test string_family_test)

View file

@ -122,6 +122,37 @@ bool DbSlice::Del(DbIndex db_ind, const MainIterator& it) {
return true;
}
size_t DbSlice::FlushDb(DbIndex db_ind) {
auto flush_single = [this](DbIndex id) {
auto& db = db_arr_[id];
CHECK(db);
size_t removed = db->main_table.size();
db->main_table.clear();
db->expire_table.clear();
db->stats.obj_memory_usage = 0;
return removed;
};
if (db_ind != kDbAll) {
CHECK_LT(db_ind, db_arr_.size());
return flush_single(db_ind);
}
size_t removed = 0;
for (size_t i = 0; i < db_arr_.size(); ++i) {
if (db_arr_[i]) {
removed += flush_single(i);
}
}
return removed;
}
// Returns true if a state has changed, false otherwise.
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
auto& db = db_arr_[db_ind];
@ -142,11 +173,11 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
return false;
}
void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms) {
void DbSlice::AddNew(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) {
CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms));
}
bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj,
bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj,
uint64_t expire_at_ms) {
auto& db = db_arr_[db_ind];
@ -220,7 +251,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
}
}
void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, std::string_view key,
void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key,
unsigned count) {
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key;

View file

@ -42,7 +42,7 @@ class DbSlice {
return now_ms_;
}
OpResult<MainIterator> Find(DbIndex db_index, std::string_view key, unsigned obj_type) const;
OpResult<MainIterator> Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const;
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<MainIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
@ -66,6 +66,15 @@ class DbSlice {
bool Del(DbIndex db_ind, const MainIterator& it);
constexpr static DbIndex kDbAll = 0xFFFF;
/**
* @brief Flushes the database of index db_ind. If kDbAll is passed then flushes all the
* databases.
*
*/
size_t FlushDb(DbIndex db_ind);
ShardId shard_id() const {
return shard_id_;
}

View file

@ -20,6 +20,11 @@ namespace dfly {
class EngineShard {
public:
struct Stats {
uint64_t ooo_runs = 0;
uint64_t quick_runs = 0;
};
// EngineShard() is private down below.
~EngineShard();
@ -62,9 +67,24 @@ class EngineShard {
return committed_txid_;
}
// Signals whether shard-wide lock is active.
// Transactions that conflict with shard locks must subscribe into pending queue.
IntentLock* shard_lock() {
return &shard_lock_;
}
// TODO: Awkward interface. I should solve it somehow.
void ShutdownMulti(Transaction* multi);
void IncQuickRun() {
stats_.quick_runs++;
}
const Stats& stats() const {
return stats_;
}
// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str;
private:
@ -74,12 +94,14 @@ class EngineShard {
::boost::fibers::fiber fiber_q_;
TxQueue txq_;
DbSlice db_slice_;
Stats stats_;
// Logical ts used to order distributed transactions.
TxId committed_txid_ = 0;
Transaction* continuation_trans_ = nullptr;
IntentLock shard_lock_;
DbSlice db_slice_;
uint32_t periodic_task_ = 0;
static thread_local EngineShard* shard_;

View file

@ -17,7 +17,6 @@ extern "C" {
#include "base/logging.h"
#include "server/conn_context.h"
#include "server/debugcmd.h"
#include "server/error.h"
#include "server/generic_family.h"
#include "server/list_family.h"
@ -51,7 +50,7 @@ constexpr size_t kMaxThreadSize = 1024;
} // namespace
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp) {
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) {
CHECK(pp);
// We support less than 1024 threads.
@ -89,8 +88,12 @@ void Service::Shutdown() {
engine_varz.reset();
request_latency_usec.Shutdown();
ping_qps.Shutdown();
// to shutdown all the runtime components that depend on EngineShard.
server_family_.Shutdown();
StringFamily::Shutdown();
GenericFamily::Shutdown();
cmd_req.Shutdown();
shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
@ -232,27 +235,6 @@ void Service::RegisterHttp(HttpListenerBase* listener) {
CHECK_NOTNULL(listener);
}
void Service::Debug(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
DebugCmd dbg_cmd{&shard_set_, cntx};
return dbg_cmd.Run(args);
}
void Service::DbSize(CmdArgList args, ConnectionContext* cntx) {
atomic_ulong num_keys{0};
shard_set_.RunBriefInParallel(
[&](EngineShard* shard) {
auto db_size = shard->db_slice().DbSize(cntx->conn_state.db_index);
num_keys.fetch_add(db_size, memory_order_relaxed);
},
[](ShardId) { return true; });
return cntx->SendLong(num_keys.load(memory_order_relaxed));
}
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
cntx->SendOk();
cntx->CloseConnection();
@ -317,12 +299,9 @@ VarzValue::Map Service::GetVarzStats() {
return res;
}
using ServiceFunc = void (Service::*)(CmdArgList args, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(Service* se, ServiceFunc f) {
return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); };
}
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
#define HFUNC(x) SetHandler(HandlerFunc(this, &Service::x))
#define HFUNC(x) SetHandler(&Service::x)
void Service::RegisterCommands() {
using CI = CommandId;
@ -330,16 +309,32 @@ void Service::RegisterCommands() {
constexpr auto kExecMask =
CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
registry_ << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
auto cb_exec = [this](CmdArgList sp, ConnectionContext* cntx) {
this->Exec(std::move(sp), cntx);
};
registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
Multi)
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.HFUNC(Exec);
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.SetHandler(cb_exec);
StringFamily::Register(&registry_);
GenericFamily::Register(&registry_);
ListFamily::Register(&registry_);
server_family_.Register(&registry_);
LOG(INFO) << "Multi-key commands are: ";
registry_.Traverse([](std::string_view key, const CI& cid) {
if (cid.is_multi_key()) {
string key_len;
if (cid.last_key_pos() < 0)
key_len = "unlimited";
else
key_len = absl::StrCat(cid.last_key_pos() - cid.first_key_pos() + 1);
LOG(INFO) << " " << key << ": with " << key_len << " keys";
}
});
}
} // namespace dfly

View file

@ -9,6 +9,7 @@
#include "server/engine_shard_set.h"
#include "util/http/http_handler.h"
#include "server/memcache_parser.h"
#include "server/server_family.h"
namespace util {
class AcceptServer;
@ -56,12 +57,10 @@ class Service {
}
private:
void Debug(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void Quit(CmdArgList args, ConnectionContext* cntx);
static void Quit(CmdArgList args, ConnectionContext* cntx);
void Exec(CmdArgList args, ConnectionContext* cntx);
void Multi(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);
void RegisterCommands();
@ -70,6 +69,7 @@ class Service {
CommandRegistry registry_;
EngineShardSet shard_set_;
util::ProactorPool& pp_;
ServerFamily server_family_;
};
} // namespace dfly

153
server/server_family.cc Normal file
View file

@ -0,0 +1,153 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/server_family.h"
#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h> // for master_id_ generation.
#include <absl/strings/match.h>
#include <filesystem>
extern "C" {
#include "redis/redis_aux.h"
}
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/debugcmd.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/main_service.h"
#include "server/transaction.h"
#include "util/accept_server.h"
DECLARE_uint32(port);
namespace dfly {
using namespace std;
using namespace util;
namespace fibers = ::boost::fibers;
namespace fs = std::filesystem;
namespace {
using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) {
return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); };
}
using CI = CommandId;
} // namespace
ServerFamily::ServerFamily(Service* engine)
: engine_(*engine), pp_(engine->proactor_pool()), ess_(engine->shard_set()) {
}
ServerFamily::~ServerFamily() {
}
void ServerFamily::Init(util::AcceptServer* acceptor) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
}
void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown";
}
void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
atomic_ulong num_keys{0};
ess_.RunBriefInParallel(
[&](EngineShard* shard) {
auto db_size = shard->db_slice().DbSize(cntx->conn_state.db_index);
num_keys.fetch_add(db_size, memory_order_relaxed);
},
[](ShardId) { return true; });
return cntx->SendLong(num_keys.load(memory_order_relaxed));
}
void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
DCHECK(cntx->transaction);
Transaction* transaction = cntx->transaction;
transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ?
transaction->Execute(
[](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(t->db_index());
return OpStatus::OK;
},
true);
cntx->SendOk();
}
void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {
if (args.size() > 1) {
cntx->SendError(kSyntaxErr);
return;
}
DCHECK(cntx->transaction);
Transaction* transaction = cntx->transaction;
transaction->Schedule();
transaction->Execute(
[](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
},
true);
cntx->SendOk();
}
void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
DebugCmd dbg_cmd{&ess_, cntx};
return dbg_cmd.Run(args);
}
void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
const char kInfo1[] =
R"(# Server
redis_version:6.2.0
redis_mode:standalone
arch_bits:64
multiplexing_api:iouring
atomicvar_api:atomic-builtin
tcp_port:)";
string info = absl::StrCat(kInfo1, FLAGS_port, "\n");
cntx->SendBulkString(info);
}
void ServerFamily::_Shutdown(CmdArgList args, ConnectionContext* cntx) {
CHECK_NOTNULL(acceptor_)->Stop();
cntx->SendOk();
}
#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x))
void ServerFamily::Register(CommandRegistry* registry) {
*registry << CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug)
<< CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb)
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll)
<< CI{"INFO", CO::LOADING | CO::STALE, -1, 0, 0, 0}.HFUNC(Info)
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING | CO::STALE, 1, 0, 0, 0}.HFUNC(
_Shutdown);
}
} // namespace dfly

48
server/server_family.h Normal file
View file

@ -0,0 +1,48 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "server/engine_shard_set.h"
#include "util/proactor_pool.h"
namespace util {
class AcceptServer;
} // namespace util
namespace dfly {
class ConnectionContext;
class CommandRegistry;
class Service;
class ServerFamily {
public:
ServerFamily(Service* engine);
~ServerFamily();
void Init(util::AcceptServer* acceptor);
void Register(CommandRegistry* registry);
void Shutdown();
private:
uint32_t shard_count() const {
return ess_.size();
}
void Debug(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void FlushDb(CmdArgList args, ConnectionContext* cntx);
void FlushAll(CmdArgList args, ConnectionContext* cntx);
void Info(CmdArgList args, ConnectionContext* cntx);
void _Shutdown(CmdArgList args, ConnectionContext* cntx);
Service& engine_;
util::ProactorPool& pp_;
EngineShardSet& ess_;
util::AcceptServer* acceptor_ = nullptr;
};
} // namespace dfly

View file

@ -233,12 +233,12 @@ bool Transaction::RunInShard(EngineShard* shard) {
// We make sure that we lock exactly once for each (multi-hop) transaction inside
// multi-transactions.
if (multi_ && ((sd.local_mask & KEYS_ACQUIRED) == 0)) {
sd.local_mask |= KEYS_ACQUIRED;
if (multi_ && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
sd.local_mask |= KEYLOCK_ACQUIRED;
shard->db_slice().Acquire(Mode(), GetLockArgs(idx));
}
DCHECK(IsGlobal() || (sd.local_mask & KEYS_ACQUIRED));
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED));
/*************************************************************************/
// Actually running the callback.
@ -270,7 +270,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
// touching those keys will be ordered via TxQueue. It's necessary because we preserve
// the atomicity of awaked transactions by halting the TxQueue.
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYS_ACQUIRED;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
CHECK_GE(DecreaseRunCnt(), 1u);
@ -280,7 +280,6 @@ bool Transaction::RunInShard(EngineShard* shard) {
}
void Transaction::ScheduleInternal(bool single_hop) {
DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED);
DCHECK_EQ(0u, txid_);
bool span_all = IsGlobal();
@ -288,10 +287,14 @@ void Transaction::ScheduleInternal(bool single_hop) {
uint32_t num_shards;
std::function<bool(uint32_t)> is_active;
IntentLock::Mode mode = Mode();
if (span_all) {
is_active = [](uint32_t) { return true; };
num_shards = ess_->size();
// Lock shards
auto cb = [mode](EngineShard* shard) { shard->shard_lock()->Acquire(mode); };
ess_->RunBriefInParallel(std::move(cb));
} else {
num_shards = unique_shard_cnt_;
DCHECK_GT(num_shards, 0u);
@ -327,7 +330,6 @@ void Transaction::ScheduleInternal(bool single_hop) {
}
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order;
state_mask_.fetch_or(SCHEDULED, memory_order_release);
break;
}
@ -397,14 +399,13 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
cb_ = nullptr;
state_mask_.fetch_or(AFTERRUN, memory_order_release);
return local_result_;
}
// Runs in the coordinator fiber.
void Transaction::UnlockMulti() {
VLOG(1) << "Transaction::UnlockMulti";
VLOG(1) << "UnlockMulti";
DCHECK(multi_);
using KeyList = vector<pair<std::string_view, LockCnt>>;
@ -418,7 +419,10 @@ void Transaction::UnlockMulti() {
sharded_keys[sid].push_back(k_v);
}
auto cb = [&](EngineShard* shard) {
auto cb = [&] {
EngineShard* shard = EngineShard::tlocal();
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
@ -433,8 +437,7 @@ void Transaction::UnlockMulti() {
auto& sd = shard_data_[SidToId(shard->shard_id())];
// It does not have to be that all shards in multi transaction execute this tx.
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
// there.
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from there.
if (sd.pq_pos != TxQueue::kEnd) {
TxQueue* txq = shard->txq();
DCHECK(!txq->Empty());
@ -445,11 +448,19 @@ void Transaction::UnlockMulti() {
}
shard->ShutdownMulti(this);
shard->PollExecution(nullptr);
this->DecreaseRunCnt();
};
ess_->RunBriefInParallel(std::move(cb));
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);
DCHECK_EQ(1u, use_count());
for (ShardId i = 0; i < shard_data_.size(); ++i) {
ess_->Add(i, cb);
}
WaitForShardCallbacks();
DCHECK_GE(use_count(), 1u);
}
// Runs in coordinator thread.
@ -463,9 +474,6 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
DVLOG(1) << "Wait on Exec " << DebugId() << " completed";
cb_ = nullptr;
uint32_t mask = conclude ? AFTERRUN : RUNNING;
state_mask_.fetch_or(mask, memory_order_relaxed);
}
// Runs in coordinator thread.
@ -547,14 +555,14 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
}
}
void Transaction::RunQuickie() {
void Transaction::RunQuickie(EngineShard* shard) {
DCHECK(!multi_);
DCHECK_EQ(1u, shard_data_.size());
DCHECK_EQ(0u, txid_);
EngineShard* shard = EngineShard::tlocal();
shard->IncQuickRun();
auto& sd = shard_data_[0];
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER));
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0];
@ -595,8 +603,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
// Fast path - for uncontended keys, just run the callback.
// That applies for single key operations like set, get, lpush etc.
if (shard->db_slice().CheckLock(mode, lock_args)) {
RunQuickie(); // TODO: for journal - this can become multi-shard
// transaction on replica.
RunQuickie(shard);
return true;
}
@ -604,12 +611,11 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
sd.pq_pos = shard->txq()->Insert(this);
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED);
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYS_ACQUIRED;
sd.local_mask |= KEYLOCK_ACQUIRED;
DCHECK(!lock_acquired); // Because CheckLock above failed.
state_mask_.fetch_or(SCHEDULED, memory_order_release);
DVLOG(1) << "Rescheduling into TxQueue " << DebugId();
shard->PollExecution(nullptr);
@ -637,11 +643,13 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
auto& sd = shard_data_[sid];
if (!spans_all) {
bool shard_unlocked = shard->shard_lock()->Check(mode);
lock_args = GetLockArgs(shard->shard_id());
// we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue.
// All transactions in the queue must acquire the intent lock.
lock_granted = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYS_ACQUIRED;
lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked;
sd.local_mask |= KEYLOCK_ACQUIRED;
DVLOG(1) << "Lock granted " << lock_granted << " for trans " << DebugId();
}
@ -657,9 +665,9 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore().
bool to_proceed = lock_granted || txq->TailScore() < txid_;
if (!to_proceed) {
if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock.
if (sd.local_mask & KEYLOCK_ACQUIRED) { // rollback the lock.
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYS_ACQUIRED;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
return result; // false, false
@ -694,11 +702,11 @@ bool Transaction::CancelInShard(EngineShard* shard) {
DCHECK(trans == this) << "Pos " << pos << ", pq size " << pq->size() << ", trans " << trans;
pq->Remove(pos);
if (sd.local_mask & KEYS_ACQUIRED) {
if (sd.local_mask & KEYLOCK_ACQUIRED) {
auto mode = Mode();
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYS_ACQUIRED;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
return true;
}

View file

@ -47,16 +47,10 @@ class Transaction {
using RunnableType = std::function<OpStatus(Transaction* t, EngineShard*)>;
using time_point = ::std::chrono::steady_clock::time_point;
enum LocalState : uint8_t {
ARMED = 1, // Transaction was armed with the callback
enum LocalMask : uint16_t {
ARMED = 1, // Transaction was armed with the callback
OUT_OF_ORDER = 2,
KEYS_ACQUIRED = 4,
};
enum State : uint8_t {
SCHEDULED = 1,
RUNNING = 2, // For running multi-hop execution callbacks.
AFTERRUN = 4, // Once transaction finished running.
KEYLOCK_ACQUIRED = 4,
};
Transaction(const CommandId* cid, EngineShardSet* ess);
@ -172,7 +166,7 @@ class Transaction {
void ExecuteAsync(bool concluding_cb);
// Optimized version of RunInShard for single shard uncontended cases.
void RunQuickie();
void RunQuickie(EngineShard* shard);
//! Returns true if transaction run out-of-order during the scheduling phase.
bool ScheduleUniqueShard(EngineShard* shard);