1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Refactor initialization phase of transaction (#790)

* refactor(server): Split transaction init

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-02-14 09:32:02 +03:00 committed by GitHub
parent 6837c38488
commit 3e46fd1318
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 220 additions and 165 deletions

View file

@ -41,7 +41,7 @@ IntentLock::Mode Transaction::Mode() const {
* @param ess
* @param cs
*/
Transaction::Transaction(const CommandId* cid) : cid_(cid) {
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
string_view cmd_name(cid_->name());
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new MultiData);
@ -59,6 +59,142 @@ Transaction::~Transaction() {
<< " destroyed";
}
void Transaction::InitBase(DbIndex dbid, CmdArgList args) {
global_ = false;
db_index_ = dbid;
cmd_with_full_args_ = args;
local_result_ = OpStatus::OK;
}
void Transaction::InitGlobal() {
global_ = true;
unique_shard_cnt_ = shard_set->size();
shard_data_.resize(unique_shard_cnt_);
}
void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
std::vector<PerShardCache>* out) {
auto args = cmd_with_full_args_;
auto& shard_index = *out;
shard_index.resize(shard_data_.size());
for (auto& v : shard_index)
v.Clear();
auto add = [this, rev_mapping, &shard_index](uint32_t sid, uint32_t i) {
string_view val = ArgS(cmd_with_full_args_, i);
shard_index[sid].args.push_back(val);
if (rev_mapping)
shard_index[sid].original_index.push_back(i - 1);
};
if (key_index.bonus) {
DCHECK(key_index.step == 1);
uint32_t sid = Shard(ArgS(args, key_index.bonus), shard_data_.size());
add(sid, key_index.bonus);
}
for (unsigned i = key_index.start; i < key_index.end; ++i) {
uint32_t sid = Shard(ArgS(args, i), shard_data_.size());
add(sid, i);
DCHECK_LE(key_index.step, 2u);
if (key_index.step == 2) { // Handle value associated with preceding key.
add(sid, ++i);
}
}
}
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping) {
bool incremental_locking = multi_ && multi_->is_expanding;
args_.reserve(num_args);
if (rev_mapping)
reverse_index_.reserve(args_.size());
// Store the concatenated per-shard arguments from the shard index inside args_
// and make each shard data point to its own sub-span inside args_.
for (size_t i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
auto& si = shard_index[i];
CHECK_LT(si.args.size(), 1u << 15);
sd.arg_count = si.args.size();
sd.arg_start = args_.size();
// Reset mask to allow locking multiple times.
if (incremental_locking)
sd.local_mask = 0;
if (!sd.arg_count)
continue;
unique_shard_cnt_++;
unique_shard_id_ = i;
for (size_t j = 0; j < si.args.size(); ++j) {
args_.push_back(si.args[j]);
if (rev_mapping)
reverse_index_.push_back(si.original_index[j]);
}
}
CHECK(args_.size() == num_args);
}
void Transaction::InitMultiData(KeyIndex key_index) {
DCHECK(multi_);
auto args = cmd_with_full_args_;
// TODO: determine correct locking mode for transactions, scripts and regular commands.
IntentLock::Mode mode = Mode();
multi_->keys.clear();
auto& tmp_uniques = tmp_space.uniq_keys;
tmp_uniques.clear();
auto lock_key = [this, mode, &tmp_uniques](auto key) {
if (auto [_, inserted] = tmp_uniques.insert(key); !inserted)
return;
if (multi_->is_expanding) {
multi_->keys.push_back(key);
} else {
multi_->lock_counts[key][mode]++;
}
};
// With EVAL, we call this function for EVAL itself as well as for each command
// for eval. currently, we lock everything only during the eval call.
if (multi_->is_expanding || !multi_->locks_recorded) {
for (size_t i = key_index.start; i < key_index.end; i += key_index.step)
lock_key(ArgS(args, i));
if (key_index.bonus > 0)
lock_key(ArgS(args, key_index.bonus));
}
multi_->locks_recorded = true;
}
void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
DCHECK_EQ(key_index.bonus, 0);
auto args = cmd_with_full_args_;
// even for a single key we may have multiple arguments per key (MSET).
for (unsigned j = key_index.start; j < key_index.start + key_index.step; ++j) {
args_.push_back(ArgS(args, j));
}
if (rev_mapping) {
reverse_index_.resize(args_.size());
for (unsigned j = 0; j < reverse_index_.size(); ++j) {
reverse_index_[j] = j + key_index.start - 1;
}
}
}
/**
*
* There are 4 options that we consider here:
@ -79,181 +215,51 @@ Transaction::~Transaction() {
*
**/
OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
db_index_ = index;
cmd_with_full_args_ = args;
local_result_ = OpStatus::OK;
if (IsGlobal()) {
unique_shard_cnt_ = shard_set->size();
shard_data_.resize(unique_shard_cnt_);
return OpStatus::OK;
}
CHECK_GT(args.size(), 1U); // first entry is the command name.
DCHECK_EQ(unique_shard_cnt_, 0u);
DCHECK(args_.empty());
OpResult<KeyIndex> key_index_res = DetermineKeys(cid_, args);
if (!key_index_res)
return key_index_res.status();
const auto& key_index = *key_index_res;
void Transaction::InitByKeys(KeyIndex key_index) {
auto args = cmd_with_full_args_;
if (key_index.start == args.size()) { // eval with 0 keys.
CHECK(absl::StartsWith(cid_->name(), "EVAL"));
return OpStatus::OK;
return;
}
DCHECK_LT(key_index.start, args.size());
DCHECK_GT(key_index.start, 0u);
bool incremental_locking = multi_ && multi_->is_expanding;
bool single_key = !multi_ && key_index.HasSingleKey();
bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
bool single_key = !multi_ && key_index.HasSingleKey();
if (single_key) {
DCHECK_GT(key_index.step, 0u);
shard_data_.resize(1); // Single key optimization
// even for a single key we may have multiple arguments per key (MSET).
for (unsigned j = key_index.start; j < key_index.start + key_index.step; ++j) {
args_.push_back(ArgS(args, j));
}
string_view key = args_.front();
// We don't have to split the arguments by shards, so we can copy them directly.
StoreKeysInArgs(key_index, needs_reverse_mapping);
shard_data_.resize(1);
unique_shard_cnt_ = 1;
unique_shard_id_ = Shard(key, shard_set->size());
unique_shard_id_ = Shard(args_.front(), shard_set->size());
if (needs_reverse_mapping) {
reverse_index_.resize(args_.size());
for (unsigned j = 0; j < reverse_index_.size(); ++j) {
reverse_index_[j] = j + key_index.start - 1;
}
}
return OpStatus::OK;
return;
}
// Our shard_data is not sparse, so we must allocate for all threads :(
shard_data_.resize(shard_set->size());
shard_data_.resize(shard_set->size()); // shard_data isn't sparse, so we must allocate for all :(
CHECK(key_index.step == 1 || key_index.step == 2);
DCHECK(key_index.step == 1 || (args.size() % 2) == 1);
// Reuse thread-local temporary storage. Since this code is atomic we can use it here.
auto& shard_index = tmp_space.shard_cache;
shard_index.resize(shard_data_.size());
for (auto& v : shard_index) {
v.Clear();
}
auto& shard_index = tmp_space.shard_cache; // Safe, because flow below is not preemptive.
// TODO: to determine correctly locking mode for transactions, scripts
// and regular commands.
IntentLock::Mode mode = IntentLock::EXCLUSIVE;
bool should_record_locks = false;
// Distribute all the arguments by shards.
BuildShardIndex(key_index, needs_reverse_mapping, &shard_index);
if (multi_) {
mode = Mode();
multi_->keys.clear();
tmp_space.uniq_keys.clear();
DCHECK_LT(int(mode), 2);
// Initialize shard data based on distributed arguments.
InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping);
// With EVAL, we call this function for EVAL itself as well as for each command
// for eval. currently, we lock everything only during the eval call.
should_record_locks = incremental_locking || !multi_->locks_recorded;
}
if (multi_)
InitMultiData(key_index);
if (key_index.bonus) { // additional one-of key.
DCHECK(key_index.step == 1);
string_view key = ArgS(args, key_index.bonus);
uint32_t sid = Shard(key, shard_data_.size());
shard_index[sid].args.push_back(key);
if (needs_reverse_mapping)
shard_index[sid].original_index.push_back(key_index.bonus - 1);
}
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = ArgS(args, i);
uint32_t sid = Shard(key, shard_data_.size());
shard_index[sid].args.push_back(key);
if (needs_reverse_mapping)
shard_index[sid].original_index.push_back(i - 1);
if (should_record_locks && tmp_space.uniq_keys.insert(key).second) {
if (multi_->is_expanding) {
multi_->keys.push_back(key);
} else {
multi_->lock_counts[key][mode]++;
}
};
if (key_index.step == 2) { // value
++i;
string_view val = ArgS(args, i);
shard_index[sid].args.push_back(val);
if (needs_reverse_mapping)
shard_index[sid].original_index.push_back(i - 1);
}
}
if (multi_) {
multi_->locks_recorded = true;
}
args_.resize(key_index.num_args());
// we need reverse index only for some commands (MSET etc).
if (needs_reverse_mapping)
reverse_index_.resize(args_.size());
auto next_arg = args_.begin();
auto rev_indx_it = reverse_index_.begin();
// slice.arg_start/arg_count point to args_ array which is sorted according to shard of each key.
// reverse_index_[i] says what's the original position of args_[i] in args.
for (size_t i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
auto& si = shard_index[i];
CHECK_LT(si.args.size(), 1u << 15);
sd.arg_count = si.args.size();
sd.arg_start = next_arg - args_.begin();
// We reset the local_mask for incremental locking to allow locking of arguments
// for each operation within the same transaction. For instant locking we lock at
// the beginning all the keys so we must preserve the mask to avoid double locking.
if (incremental_locking) {
sd.local_mask = 0;
}
if (!sd.arg_count)
continue;
++unique_shard_cnt_;
unique_shard_id_ = i;
for (size_t j = 0; j < si.args.size(); ++j) {
*next_arg = si.args[j];
if (needs_reverse_mapping) {
*rev_indx_it++ = si.original_index[j];
}
++next_arg;
}
}
CHECK(next_arg == args_.end());
DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front();
// validation
if (needs_reverse_mapping) {
for (size_t i = 0; i < args_.size(); ++i) {
DCHECK_EQ(args_[i], ArgS(args, 1 + reverse_index_[i])); // 1 for the commandname.
}
}
// Compress shard data, if we occupy only one shard.
if (unique_shard_cnt_ == 1) {
PerShardData* sd;
if (multi_) {
@ -266,6 +272,13 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
sd->arg_start = -1;
}
// Validation. Check reverse mapping was built correctly.
if (needs_reverse_mapping) {
for (size_t i = 0; i < args_.size(); ++i) {
DCHECK_EQ(args_[i], ArgS(args, 1 + reverse_index_[i])); // 1 for the commandname.
}
}
// Validation.
for (const auto& sd : shard_data_) {
// sd.local_mask may be non-zero for multi transactions with instant locking.
@ -275,7 +288,25 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
}
}
}
OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
InitBase(index, args);
if ((cid_->opt_mask() & CO::GLOBAL_TRANS) > 0) {
InitGlobal();
return OpStatus::OK;
}
CHECK_GT(args.size(), 1U); // first entry is the command name.
DCHECK_EQ(unique_shard_cnt_, 0u);
DCHECK(args_.empty());
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
if (!key_index)
return key_index.status();
InitByKeys(*key_index);
return OpStatus::OK;
}
@ -1113,7 +1144,7 @@ inline uint32_t Transaction::DecreaseRunCnt() {
}
bool Transaction::IsGlobal() const {
return (cid_->opt_mask() & CO::GLOBAL_TRANS) != 0;
return global_;
}
// Runs only in the shard thread.

View file

@ -279,7 +279,39 @@ class Transaction {
COORD_OOO = 0x20,
};
struct PerShardCache {
std::vector<std::string_view> args;
std::vector<uint32_t> original_index;
void Clear() {
args.clear();
original_index.clear();
}
};
private:
// Init basic fields and reset re-usable.
void InitBase(DbIndex dbid, CmdArgList args);
// Init as a global transaction.
void InitGlobal();
// Init with a set of keys.
void InitByKeys(KeyIndex keys);
// Build shard index by distributing the arguments by shards based on the key index.
void BuildShardIndex(KeyIndex keys, bool rev_mapping, std::vector<PerShardCache>* out);
// Init shard data from shard index.
void InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping);
// Init multi. Record locks if needed.
void InitMultiData(KeyIndex keys);
// Store all key index keys in args_. Used only for single shard initialization.
void StoreKeysInArgs(KeyIndex keys, bool rev_mapping);
// Generic schedule used from Schedule() and ScheduleSingleHop() on slow path.
void ScheduleInternal();
@ -344,13 +376,12 @@ class Transaction {
// Iterate over shards and run function accepting (PerShardData&, ShardId) on all active ones.
template <typename F> void IterateActiveShards(F&& f) {
bool is_global = IsGlobal();
if (!is_global && unique_shard_cnt_ == 1) { // unique_shard_id_ is set only for non-global.
if (!global_ && unique_shard_cnt_ == 1) { // unique_shard_id_ is set only for non-global.
auto i = unique_shard_id_;
f(shard_data_[SidToId(i)], i);
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
if (auto& sd = shard_data_[i]; is_global || sd.arg_count > 0) {
if (auto& sd = shard_data_[i]; global_ || sd.arg_count > 0) {
f(sd, i);
}
}
@ -370,8 +401,9 @@ class Transaction {
// Stores the full undivided command.
CmdArgList cmd_with_full_args_;
std::atomic<bool> renabled_auto_journal_ =
false; // True if NO_AUTOJOURNAL command asked to enable auto journal
// True if NO_AUTOJOURNAL command asked to enable auto journal
std::atomic<bool> renabled_auto_journal_ = false;
// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_;
@ -381,8 +413,10 @@ class Transaction {
std::unique_ptr<MultiData> multi_; // Initialized when the transaction is multi/exec.
TxId txid_{0};
bool global_{false};
DbIndex db_index_{0};
uint64_t time_now_ms_{0};
std::atomic<TxId> notify_txid_{kuint64max};
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
@ -403,16 +437,6 @@ class Transaction {
OpStatus local_result_ = OpStatus::OK;
private:
struct PerShardCache {
std::vector<std::string_view> args;
std::vector<uint32_t> original_index;
void Clear() {
args.clear();
original_index.clear();
}
};
struct TLTmpSpace {
std::vector<PerShardCache> shard_cache;
absl::flat_hash_set<std::string_view> uniq_keys;