1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

Fix expiry bug in AddOrFind function

Also add more test coverage.
Fix a bug where iterators might get invalidated between Renamer hops.
Initial work on BLPOP transaction.
This commit is contained in:
Roman Gershman 2022-01-11 19:55:41 +02:00
parent a82c409006
commit 0cf2f57bf2
16 changed files with 249 additions and 74 deletions

View file

@ -14,6 +14,7 @@ enum class OpStatus : uint16_t {
KEY_NOTFOUND,
SKIPPED,
WRONG_TYPE,
TIMED_OUT,
};
class OpResultBase {

View file

@ -94,6 +94,8 @@ const char* OptName(CO::CommandOpt fl) {
return "admin";
case NOSCRIPT:
return "noscript";
case BLOCKING:
return "blocking";
case GLOBAL_TRANS:
return "global-trans";
}

View file

@ -28,6 +28,7 @@ enum CommandOpt : uint32_t {
RANDOM = 0x40,
ADMIN = 0x80, // implies NOSCRIPT,
NOSCRIPT = 0x100,
BLOCKING = 0x200,
GLOBAL_TRANS = 0x1000,
};

View file

@ -51,6 +51,7 @@ class ConnectionContext : public ReplyBuilder {
struct DebugInfo {
uint32_t shards_count = 0;
TxClock clock = 0;
bool is_ooo = false;
};
DebugInfo last_command_debug;

View file

@ -4,6 +4,10 @@
#include "server/db_slice.h"
extern "C" {
#include "redis/object.h"
}
#include <boost/fiber/fiber.hpp>
#include <boost/fiber/operations.hpp>
@ -42,12 +46,17 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
db->main_table.reserve(key_size);
}
auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned obj_type) const -> OpResult<MainIterator> {
auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const
-> OpResult<MainIterator> {
auto [it, expire_it] = FindExt(db_index, key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (it->second.ObjType() != req_obj_type) {
return OpStatus::WRONG_TYPE;
}
return it;
}
@ -78,19 +87,58 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_
return make_pair(it, expire_it);
}
OpResult<pair<MainIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) {
DCHECK(!args.empty());
for (unsigned i = 0; i < args.size(); ++i) {
string_view s = args[i];
OpResult<MainIterator> res = Find(db_index, s, OBJ_LIST);
if (res)
return make_pair(res.value(), i);
if (res.status() != OpStatus::KEY_NOTFOUND)
return res.status();
}
VLOG(1) << "FindFirst " << args.front() << " not found";
return OpStatus::KEY_NOTFOUND;
}
auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair<MainIterator, bool> {
DCHECK(IsDbValid(db_index));
auto& db = db_arr_[db_index];
MainIterator existing;
pair<MainIterator, bool> res = db->main_table.emplace(key, MainValue{});
if (res.second) { // new entry
db->stats.obj_memory_usage += res.first->first.capacity();
return make_pair(res.first, true);
}
existing = res.first;
return res;
DCHECK(IsValid(existing));
if (existing->second.HasExpire()) {
auto expire_it = db->expire_table.find(existing->first);
CHECK(IsValid(expire_it));
if (expire_it->second <= now_ms_) {
db->expire_table.erase(expire_it);
// Keep the entry but free the object.
db->stats.obj_memory_usage -= existing->second.str.capacity();
existing->second.obj_type = OBJ_STRING;
if (existing->second.robj) {
decrRefCountVoid(existing->second.robj);
existing->second.robj = nullptr;
}
return make_pair(existing, true);
}
}
return make_pair(existing, false);
}
void DbSlice::ActivateDb(DbIndex db_ind) {
@ -152,7 +200,6 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
return removed;
}
// Returns true if a state has changed, false otherwise.
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
auto& db = db_arr_[db_ind];
@ -177,8 +224,7 @@ void DbSlice::AddNew(DbIndex db_ind, string_view key, MainValue obj, uint64_t ex
CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms));
}
bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj,
uint64_t expire_at_ms) {
bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) {
auto& db = db_arr_[db_ind];
auto [new_entry, success] = db->main_table.emplace(key, obj);
@ -251,8 +297,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
}
}
void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key,
unsigned count) {
void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key, unsigned count) {
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key;
auto& lt = db_arr_[db_index]->lock_table;

View file

@ -47,6 +47,10 @@ class DbSlice {
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
std::pair<MainIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
// Returns dictEntry, args-index if found, KEY_NOTFOUND otherwise.
// If multiple keys are found, returns the first index in the ArgSlice.
OpResult<std::pair<MainIterator, unsigned>> FindFirst(DbIndex db_index, const ArgSlice& args);
// Return .second=true if insertion ocurred, false if we return the existing key.
std::pair<MainIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);

View file

@ -70,15 +70,16 @@ TEST_F(DflyEngineTest, Multi) {
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
}
TEST_F(DflyEngineTest, MultiEmpty) {
RespVec resp = Run({"multi"});
ASSERT_THAT(resp, RespEq("OK"));
resp = Run({"exec"});
ASSERT_THAT(resp[0], ArrLen(0));
ASSERT_FALSE(service_->IsShardSetLocked());
}
TEST_F(DflyEngineTest, MultiSeq) {
@ -95,6 +96,7 @@ TEST_F(DflyEngineTest, MultiSeq) {
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
EXPECT_THAT(resp, ElementsAre(StrArg("OK"), StrArg("1"), ArrLen(2)));
const RespExpr::Vec& arr = *get<RespVec*>(resp[2].u);
@ -139,6 +141,7 @@ TEST_F(DflyEngineTest, MultiConsistent) {
fb.join();
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
}
TEST_F(DflyEngineTest, MultiRename) {
@ -153,6 +156,65 @@ TEST_F(DflyEngineTest, MultiRename) {
EXPECT_THAT(resp, ElementsAre(StrArg("OK"), StrArg("OK")));
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
}
TEST_F(DflyEngineTest, MultiHop) {
Run({"set", kKey1, "1"});
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
for (int i = 0; i < 100; ++i) {
auto resp = Run({"rename", kKey1, kKey2});
ASSERT_THAT(resp, RespEq("OK"));
EXPECT_EQ(2, GetDebugInfo("IO1").shards_count);
resp = Run({"rename", kKey2, kKey1});
ASSERT_THAT(resp, RespEq("OK"));
}
});
// mset should be executed either as ooo or via tx-queue because previous transactions
// have been unblocked and executed as well. In other words, this mset should never block
// on serializability constraints.
auto p2_fb = pp_->at(2)->LaunchFiber([&] {
for (int i = 0; i < 100; ++i) {
Run({"mset", kKey3, "1", kKey4, "2"});
}
});
p1_fb.join();
p2_fb.join();
}
TEST_F(DflyEngineTest, FlushDb) {
Run({"mset", kKey1, "1", kKey4, "2"});
auto resp = Run({"flushdb"});
ASSERT_THAT(resp, RespEq("OK"));
auto fb0 = pp_->at(0)->LaunchFiber([&] {
for (unsigned i = 0; i < 100; ++i) {
Run({"flushdb"});
}
});
pp_->at(1)->AwaitBlocking([&] {
for (unsigned i = 0; i < 100; ++i) {
Run({"mset", kKey1, "1", kKey4, "2"});
auto resp = Run({"exists", kKey1, kKey4});
int64_t ival = get<int64_t>(resp[0].u);
ASSERT_TRUE(ival == 0 || ival == 2) << i << " " << ival;
}
});
fb0.join();
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
}
// TODO: to test transactions with a single shard since then all transactions become local.
// To consider having a parameter in dragonfly engine controlling number of shards
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
} // namespace dfly

View file

@ -135,12 +135,12 @@ class EngineShardSet {
}
// Runs a brief function on all shards. Waits for it to complete.
template <typename U> void RunBriefInParallel(U&& func) {
template <typename U> void RunBriefInParallel(U&& func) const {
RunBriefInParallel(std::forward<U>(func), [](auto i) { return true; });
}
// Runs a brief function on selected shards. Waits for it to complete.
template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred);
template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred) const;
template <typename U> void RunBlockingInParallel(U&& func);
@ -149,7 +149,8 @@ class EngineShardSet {
std::vector<util::fibers_ext::FiberQueue*> shard_queue_;
};
template <typename U, typename P> void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) {
template <typename U, typename P>
void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
util::fibers_ext::BlockingCounter bc{0};
for (uint32_t i = 0; i < size(); ++i) {

View file

@ -26,8 +26,7 @@ class Renamer {
Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) {
}
// TODO: to implement locking semantics.
OpResult<void> FindAndLock(ShardId shard_id, const ArgSlice& args);
OpResult<void> Find(ShardId shard_id, const ArgSlice& args);
OpResult<void> status() const {
return status_;
@ -36,75 +35,82 @@ class Renamer {
Transaction::RunnableType Finalize(bool skip_exist_dest);
private:
void SwapValues(EngineShard* shard, const ArgSlice& args);
void MoveValues(EngineShard* shard, const ArgSlice& args);
DbIndex db_indx_;
ShardId src_sid_;
pair<MainIterator, ExpireIterator> find_res_[2];
uint64_t expire_;
MainValue src_val_;
struct FindResult {
string_view key;
MainValue val;
uint64_t expire_ts;
bool found = false;
};
FindResult src_res_, dest_res_; // index 0 for source, 1 for destination
OpResult<void> status_;
};
OpResult<void> Renamer::FindAndLock(ShardId shard_id, const ArgSlice& args) {
OpResult<void> Renamer::Find(ShardId shard_id, const ArgSlice& args) {
CHECK_EQ(1u, args.size());
unsigned indx = (shard_id == src_sid_) ? 0 : 1;
FindResult* res = (shard_id == src_sid_) ? &src_res_ : &dest_res_;
find_res_[indx] = EngineShard::tlocal()->db_slice().FindExt(db_indx_, args.front());
res->key = args.front();
auto [it, exp_it] = EngineShard::tlocal()->db_slice().FindExt(db_indx_, res->key);
res->found = IsValid(it);
if (IsValid(it)) {
res->val = it->second; // TODO: won't work for robj because we copy pointers.
res->expire_ts = IsValid(exp_it) ? exp_it->second : 0;
}
return OpStatus::OK;
};
void Renamer::SwapValues(EngineShard* shard, const ArgSlice& args) {
auto& dest = find_res_[1];
void Renamer::MoveValues(EngineShard* shard, const ArgSlice& args) {
auto shard_id = shard->shard_id();
// NOTE: This object juggling between shards won't work if we want to maintain heap per shard
// model.
// TODO: when we want to maintain heap per shard model this code will require additional
// work
if (shard_id == src_sid_) { // Handle source key.
// delete the source entry.
CHECK(shard->db_slice().Del(db_indx_, find_res_[0].first));
auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first;
CHECK(shard->db_slice().Del(db_indx_, it));
return;
}
// Handle destination
MainIterator dest_it = dest.first;
string_view dest_key = dest_res_.key;
MainIterator dest_it = shard->db_slice().FindExt(db_indx_, dest_key).first;
if (IsValid(dest_it)) {
dest_it->second = std::move(src_val_); // we just move the source.
shard->db_slice().Expire(db_indx_, dest_it, expire_);
// we just move the source. We won't be able to do it with heap per shard model.
dest_it->second = std::move(src_res_.val);
shard->db_slice().Expire(db_indx_, dest_it, src_res_.expire_ts);
} else {
// we just add the key to destination with the source object.
string_view key = args.front(); // from key
shard->db_slice().AddNew(db_indx_, key, std::move(src_val_), expire_);
shard->db_slice().AddNew(db_indx_, dest_key, src_res_.val, src_res_.expire_ts);
}
}
Transaction::RunnableType Renamer::Finalize(bool skip_exist_dest) {
const auto& src = find_res_[0];
const auto& dest = find_res_[1];
auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
if (!IsValid(src.first)) {
if (!src_res_.found) {
status_ = OpStatus::KEY_NOTFOUND;
return cleanup;
}
if (IsValid(dest.first) && skip_exist_dest) {
if (dest_res_.found && skip_exist_dest) {
status_ = OpStatus::KEY_EXISTS;
return cleanup;
}
expire_ = IsValid(src.second) ? src.second->second : 0;
src_val_ = std::move(src.first->second);
// Src key exist and we need to override the destination.
return [this](Transaction* t, EngineShard* shard) {
this->SwapValues(shard, t->ShardArgsInShard(shard->shard_id()));
this->MoveValues(shard, t->ShardArgsInShard(shard->shard_id()));
return OpStatus::OK;
};
@ -295,7 +301,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
transaction->Execute(
[&renamer](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
return renamer.FindAndLock(shard->shard_id(), args).status();
return renamer.Find(shard->shard_id(), args).status();
},
false);

View file

@ -39,5 +39,18 @@ TEST_F(ListFamilyTest, Basic) {
ASSERT_THAT(resp[0], IntArg(1));
}
TEST_F(ListFamilyTest, Expire) {
auto resp = Run({"lpush", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
constexpr uint64_t kNow = 232279092000;
UpdateTime(kNow);
resp = Run({"expire", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(kNow + 1000);
resp = Run({"lpush", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
}
} // namespace dfly

View file

@ -50,7 +50,7 @@ constexpr size_t kMaxThreadSize = 1024;
} // namespace
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) {
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) {
CHECK(pp);
// We support less than 1024 threads.
@ -89,7 +89,7 @@ void Service::Shutdown() {
request_latency_usec.Shutdown();
ping_qps.Shutdown();
// to shutdown all the runtime components that depend on EngineShard.
// to shutdown all the runtime components that depend on EngineShard.
server_family_.Shutdown();
StringFamily::Shutdown();
GenericFamily::Shutdown();
@ -174,7 +174,9 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
if (dist_trans) {
cntx->last_command_debug.clock = dist_trans->txid();
cntx->last_command_debug.is_ooo = dist_trans->IsOOO();
}
cntx->transaction = nullptr;
}
void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
@ -231,6 +233,17 @@ bool Service::IsLocked(DbIndex db_index, std::string_view key) const {
return !is_open;
}
bool Service::IsShardSetLocked() const {
std::atomic_uint res{0};
shard_set_.RunBriefInParallel([&](EngineShard* shard) {
bool unlocked = shard->shard_lock()->Check(IntentLock::SHARED);
res.fetch_add(!unlocked, memory_order_relaxed);
});
return res.load() != 0;
}
void Service::RegisterHttp(HttpListenerBase* listener) {
CHECK_NOTNULL(listener);
}
@ -240,6 +253,15 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
cntx->CloseConnection();
}
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) {
return cntx->SendError("MULTI calls can not be nested");
}
cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT;
// TODO: to protect against huge exec transactions.
return cntx->SendOk();
}
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_state == ConnectionState::EXEC_INACTIVE) {
return cntx->SendError("EXEC without MULTI");
@ -280,15 +302,6 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Exec completed";
}
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE) {
return cntx->SendError("MULTI calls can not be nested");
}
cntx->conn_state.exec_state = ConnectionState::EXEC_COLLECT;
// TODO: to protect against huge exec transactions.
return cntx->SendOk();
}
VarzValue::Map Service::GetVarzStats() {
VarzValue::Map res;
@ -306,8 +319,7 @@ using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
void Service::RegisterCommands() {
using CI = CommandId;
constexpr auto kExecMask =
CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
auto cb_exec = [this](CmdArgList sp, ConnectionContext* cntx) {
this->Exec(std::move(sp), cntx);

View file

@ -47,6 +47,7 @@ class Service {
// Used by tests.
bool IsLocked(DbIndex db_index, std::string_view key) const;
bool IsShardSetLocked() const;
EngineShardSet& shard_set() {
return shard_set_;

View file

@ -208,4 +208,18 @@ void ReplyBuilder::SendSimpleStrArr(const std::string_view* arr, uint32_t count)
serializer_->SendDirect(res);
}
void ReplyBuilder::SendNullArray() {
as_resp()->SendDirect("*-1\r\n");
}
void ReplyBuilder::SendStringArr(absl::Span<const std::string_view> arr) {
string res = absl::StrCat("*", arr.size(), kCRLF);
for (size_t i = 0; i < arr.size(); ++i) {
StrAppend(&res, "$", arr[i].size(), kCRLF);
res.append(arr[i]).append(kCRLF);
}
as_resp()->SendDirect(res);
}
} // namespace dfly

View file

@ -108,6 +108,9 @@ class ReplyBuilder {
// Resp specific.
// This one is prefixed with + and with clrf added automatically to each item..
void SendSimpleStrArr(const std::string_view* arr, uint32_t count);
void SendNullArray();
void SendStringArr(absl::Span<const std::string_view> arr);
void SendNull() {
as_resp()->SendNull();

View file

@ -264,13 +264,18 @@ bool Transaction::RunInShard(EngineShard* shard) {
// If it's a final hop we should release the locks.
if (should_release) {
KeyLockArgs largs = GetLockArgs(idx);
if (IsGlobal()) {
shard->shard_lock()->Release(Mode());
} else { // not global.
KeyLockArgs largs = GetLockArgs(idx);
// If a transaction has been suspended, we keep the lock so that future transaction
// touching those keys will be ordered via TxQueue. It's necessary because we preserve
// the atomicity of awaked transactions by halting the TxQueue.
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
// If a transaction has been suspended, we keep the lock so that future transaction
// touching those keys will be ordered via TxQueue. It's necessary because we preserve
// the atomicity of awaked transactions by halting the TxQueue.
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
sd.local_mask &= ~OUT_OF_ORDER;
}
}
CHECK_GE(DecreaseRunCnt(), 1u);
@ -405,7 +410,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// Runs in the coordinator fiber.
void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMulti";
VLOG(1) << "UnlockMulti " << DebugId();
DCHECK(multi_);
using KeyList = vector<pair<std::string_view, LockCnt>>;
@ -437,7 +442,8 @@ void Transaction::UnlockMulti() {
auto& sd = shard_data_[SidToId(shard->shard_id())];
// It does not have to be that all shards in multi transaction execute this tx.
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from there.
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
// there.
if (sd.pq_pos != TxQueue::kEnd) {
TxQueue* txq = shard->txq();
DCHECK(!txq->Empty());
@ -461,6 +467,8 @@ void Transaction::UnlockMulti() {
}
WaitForShardCallbacks();
DCHECK_GE(use_count(), 1u);
VLOG(1) << "UnlockMultiEnd " << DebugId();
}
// Runs in coordinator thread.
@ -517,21 +525,23 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
// We verify seq lock has the same generation number. See below for more info.
auto cb = [seq, this] {
EngineShard* shard = EngineShard::tlocal();
DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " "
<< run_count_.load(memory_order_relaxed);
uint16_t local_mask = GetLocalMask(shard->shard_id());
// we use fetch_add with release trick to make sure that local_mask is loaded before
// we load seq_after. We could gain similar result with "atomic_thread_fence(acquire)"
uint32_t seq_after = seqlock_.fetch_add(0, memory_order_release);
bool should_poll = (seq_after == seq) && (local_mask & ARMED);
DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " "
<< run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll;
// We verify that this callback is still relevant.
// If we still have the same sequence number and local_mask is ARMED it means
// the coordinator thread has not crossed WaitForShardCallbacks barrier.
// Otherwise, this callback is redundant. We may still call PollExecution but
// we should not pass this to it since it can be in undefined state for this callback.
if (seq_after == seq && (local_mask & ARMED)) {
if (should_poll) {
// shard->PollExecution(this) does not necessarily execute this transaction.
// Therefore, everything that should be handled during the callback execution
// should go into RunInShard.
@ -561,6 +571,7 @@ void Transaction::RunQuickie(EngineShard* shard) {
DCHECK_EQ(0u, txid_);
shard->IncQuickRun();
auto& sd = shard_data_[0];
DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER));
@ -734,13 +745,14 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
}
inline uint32_t Transaction::DecreaseRunCnt() {
// to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running. We can not put it inside the (res == 1) block because then it's too late.
::boost::intrusive_ptr guard(this);
// We use release so that no stores will be reordered after.
uint32_t res = run_count_.fetch_sub(1, std::memory_order_release);
if (res == 1) {
// to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running.
::boost::intrusive_ptr guard(this);
run_ec_.notify();
}
return res;

View file

@ -93,10 +93,6 @@ class Transaction {
return shard_data_[SidToId(sid)].local_mask;
}
uint32_t GetStateMask() const {
return state_mask_.load(std::memory_order_relaxed);
}
// Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP.
// For single hop, use ScheduleSingleHop instead.
void Schedule() {
@ -148,6 +144,10 @@ class Transaction {
bool IsGlobal() const;
bool IsOOO() const {
return false;
}
EngineShardSet* shard_set() {
return ess_;
}
@ -257,9 +257,6 @@ class Transaction {
uint32_t trans_options_ = 0;
// Written by coordination thread but may be read by Shard threads.
// A mask of State values. Mostly used for debugging and for invariant checks.
std::atomic<uint16_t> state_mask_{0};
ShardId unique_shard_id_{kInvalidSid};
DbIndex db_index_ = 0;