mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
bug(server): replicate scripts in stable state (#1114)
* bug(server): replicate scripts in stable state --------- Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
2d501111ea
commit
7f56a435c4
8 changed files with 81 additions and 8 deletions
|
@ -29,7 +29,7 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
|
|||
}
|
||||
|
||||
bool CommandId::IsTransactional() const {
|
||||
if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS))
|
||||
if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_JOURNAL))
|
||||
return true;
|
||||
|
||||
string_view name{name_};
|
||||
|
@ -124,6 +124,8 @@ const char* OptName(CO::CommandOpt fl) {
|
|||
return "variadic-keys";
|
||||
case NO_AUTOJOURNAL:
|
||||
return "custom-journal";
|
||||
case NO_KEY_JOURNAL:
|
||||
return "no-key-journal";
|
||||
}
|
||||
return "unknown";
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ enum CommandOpt : uint32_t {
|
|||
GLOBAL_TRANS = 1U << 12,
|
||||
|
||||
NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction.
|
||||
NO_KEY_JOURNAL = 1U << 16, // Command with no keys that need to be journaled
|
||||
};
|
||||
|
||||
const char* OptName(CommandOpt fl);
|
||||
|
|
|
@ -674,6 +674,9 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
|
|||
}
|
||||
|
||||
bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||
if (lock_args.args.empty()) {
|
||||
return true;
|
||||
}
|
||||
DCHECK_GT(lock_args.key_step, 0u);
|
||||
|
||||
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||
|
@ -700,6 +703,9 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
|||
}
|
||||
|
||||
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||
if (lock_args.args.empty()) {
|
||||
return;
|
||||
}
|
||||
DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
|
||||
if (lock_args.args.size() == 1) {
|
||||
Release(mode, lock_args.db_index, lock_args.args.front(), 1);
|
||||
|
@ -729,7 +735,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, string_view key) co
|
|||
}
|
||||
|
||||
bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const {
|
||||
DCHECK(!lock_args.args.empty());
|
||||
const auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||
auto s = lock_args.args[i];
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "facade/error.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/server_state.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
ABSL_FLAG(std::string, default_lua_config, "",
|
||||
"Configure default mode for running Lua scripts: \n - Use 'allow-undeclared-keys' to "
|
||||
|
@ -126,6 +127,10 @@ void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (!res)
|
||||
return (*cntx)->SendError(res.error().Format());
|
||||
|
||||
// Schedule empty callback inorder to journal command via transaction framework.
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
|
||||
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
return (*cntx)->SendBulkString(res.value());
|
||||
}
|
||||
|
||||
|
|
|
@ -2111,7 +2111,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf)
|
||||
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)
|
||||
<< CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0}.SetHandler(SlowLog)
|
||||
<< CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script)
|
||||
<< CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_JOURNAL, -2, 0, 0, 0}.HFUNC(Script)
|
||||
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly);
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,14 @@ void Transaction::InitGlobal() {
|
|||
sd.local_mask = ACTIVE;
|
||||
}
|
||||
|
||||
void Transaction::InitNoKey() {
|
||||
// No key command will use the first shard.
|
||||
unique_shard_cnt_ = 1;
|
||||
unique_shard_id_ = 0;
|
||||
shard_data_.resize(1);
|
||||
shard_data_.front().local_mask |= ACTIVE;
|
||||
}
|
||||
|
||||
void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
|
||||
std::vector<PerShardCache>* out) {
|
||||
auto args = full_args_;
|
||||
|
@ -310,6 +318,11 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) {
|
||||
InitNoKey();
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
DCHECK_EQ(unique_shard_cnt_, 0u);
|
||||
DCHECK(args_.empty());
|
||||
|
||||
|
@ -916,6 +929,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
|
|||
res.db_index = db_index_;
|
||||
res.key_step = cid_->key_arg_step();
|
||||
res.args = GetShardArgs(sid);
|
||||
DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_JOURNAL));
|
||||
|
||||
return res;
|
||||
}
|
||||
|
@ -947,6 +961,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
|||
sd.pq_pos = shard->txq()->Insert(this);
|
||||
|
||||
DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED);
|
||||
|
||||
shard->db_slice().Acquire(mode, lock_args);
|
||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||
|
||||
|
@ -1057,8 +1072,6 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
|
|||
|
||||
// runs in engine-shard thread.
|
||||
ArgSlice Transaction::GetShardArgs(ShardId sid) const {
|
||||
DCHECK(!args_.empty());
|
||||
|
||||
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
|
||||
// barrier.
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
|
@ -1289,10 +1302,18 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
|
|||
if (multi_ && multi_->role == SQUASHER)
|
||||
return;
|
||||
|
||||
// Ignore non-write commands or ones with disabled autojournal.
|
||||
if ((cid_->opt_mask() & CO::WRITE) == 0 || ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 &&
|
||||
!renabled_auto_journal_.load(memory_order_relaxed)))
|
||||
bool journal_by_cmd_mask = true;
|
||||
if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) {
|
||||
journal_by_cmd_mask = true; // Enforce journaling for commands that dont change the db.
|
||||
} else if ((cid_->opt_mask() & CO::WRITE) == 0) {
|
||||
journal_by_cmd_mask = false; // Non-write command are not journaled.
|
||||
} else if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 &&
|
||||
!renabled_auto_journal_.load(memory_order_relaxed)) {
|
||||
journal_by_cmd_mask = false; // Command disabled auto journal.
|
||||
}
|
||||
if (!journal_by_cmd_mask) {
|
||||
return;
|
||||
}
|
||||
|
||||
auto journal = shard->journal();
|
||||
if (journal == nullptr)
|
||||
|
|
|
@ -389,6 +389,9 @@ class Transaction {
|
|||
// Init as a global transaction.
|
||||
void InitGlobal();
|
||||
|
||||
// Init when command has no keys and it need to use transaction framework
|
||||
void InitNoKey();
|
||||
|
||||
// Init with a set of keys.
|
||||
void InitByKeys(KeyIndex keys);
|
||||
|
||||
|
|
|
@ -835,3 +835,39 @@ async def test_auth_master(df_local_factory, n_keys=20):
|
|||
assert all(v is not None for v in res)
|
||||
await c_master.connection_pool.disconnect()
|
||||
await c_replica.connection_pool.disconnect()
|
||||
|
||||
|
||||
SCRIPT_TEMPLATE = "return {}"
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 2})
|
||||
async def test_script_transfer(df_local_factory):
|
||||
master = df_local_factory.create(port=BASE_PORT)
|
||||
replica = df_local_factory.create(port=BASE_PORT+1)
|
||||
|
||||
df_local_factory.start_all([master, replica])
|
||||
|
||||
c_master = aioredis.Redis(port=master.port)
|
||||
c_replica = aioredis.Redis(port=replica.port)
|
||||
|
||||
# Load some scripts into master ahead
|
||||
scripts = []
|
||||
for i in range(0, 10):
|
||||
sha = await c_master.script_load(SCRIPT_TEMPLATE.format(i))
|
||||
scripts.append(sha)
|
||||
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
await wait_available_async(c_replica)
|
||||
|
||||
# transfer in stable state
|
||||
for i in range(10, 20):
|
||||
sha = await c_master.script_load(SCRIPT_TEMPLATE.format(i))
|
||||
scripts.append(sha)
|
||||
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
await c_replica.execute_command("REPLICAOF NO ONE")
|
||||
|
||||
for i, sha in enumerate(scripts):
|
||||
assert await c_replica.evalsha(sha, 0) == i
|
||||
await c_master.connection_pool.disconnect()
|
||||
await c_replica.connection_pool.disconnect()
|
||||
|
|
Loading…
Reference in a new issue