mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
fix: fix script replication (#2531)
* fix: fix script replication Single key script replication was previously broken because the EXEC entry wasn't sent. Send it manually --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
562aa151a7
commit
e0f86697f9
5 changed files with 58 additions and 6 deletions
|
@ -119,7 +119,7 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
|
|||
|
||||
// Otherwise, continue building multi command.
|
||||
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
|
||||
DCHECK(res->txid > 0);
|
||||
DCHECK(res->txid > 0 || res->shard_cnt == 1);
|
||||
|
||||
auto txid = res->txid;
|
||||
auto& txdata = current_[txid];
|
||||
|
|
|
@ -1850,6 +1850,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
|||
cntx->transaction = stub_tx.get();
|
||||
|
||||
result = interpreter->RunFunction(eval_args.sha, &error);
|
||||
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal
|
||||
|
||||
cntx->transaction = tx;
|
||||
return OpStatus::OK;
|
||||
|
|
|
@ -188,7 +188,9 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio
|
|||
// Use squashing mechanism for inline execution of single-shard EVAL
|
||||
multi_->mode = LOCK_AHEAD;
|
||||
}
|
||||
|
||||
multi_->role = SQUASHED_STUB;
|
||||
multi_->shard_journal_write.resize(1);
|
||||
|
||||
time_now_ms_ = parent->time_now_ms_;
|
||||
|
||||
|
@ -966,6 +968,16 @@ const absl::flat_hash_set<std::string_view>& Transaction::GetMultiKeys() const {
|
|||
return multi_->frozen_keys_set;
|
||||
}
|
||||
|
||||
void Transaction::FIX_ConcludeJournalExec() {
|
||||
if (!multi_->shard_journal_write.front())
|
||||
return;
|
||||
|
||||
if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) {
|
||||
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1,
|
||||
unique_slot_checker_.GetUniqueSlotId(), {}, false);
|
||||
}
|
||||
}
|
||||
|
||||
void Transaction::EnableShard(ShardId sid) {
|
||||
unique_shard_cnt_ = 1;
|
||||
unique_shard_id_ = sid;
|
||||
|
@ -1464,8 +1476,13 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&
|
|||
bool allow_await) const {
|
||||
auto journal = shard->journal();
|
||||
CHECK(journal);
|
||||
if (multi_ && multi_->role != SQUASHED_STUB)
|
||||
multi_->shard_journal_write[shard->shard_id()] = true;
|
||||
|
||||
if (multi_) {
|
||||
if (multi_->role != SQUASHED_STUB)
|
||||
multi_->shard_journal_write[shard->shard_id()] = true;
|
||||
else
|
||||
multi_->shard_journal_write[0] = true;
|
||||
}
|
||||
|
||||
bool is_multi = multi_commands || IsAtomicMulti();
|
||||
|
||||
|
@ -1486,9 +1503,8 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
|
|||
|
||||
void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
|
||||
// We're on the owning thread of this transaction, so we can safely access it's data below.
|
||||
// We still need to claim the blocking barrier, but as this function is often called blindly, we
|
||||
// want to check first if it makes sense to even proceed.
|
||||
if (blocking_barrier_.IsClaimed())
|
||||
// First, check if it makes sense to proceed.
|
||||
if (blocking_barrier_.IsClaimed() || cid_ == nullptr || (cid_->opt_mask() & CO::BLOCKING) == 0)
|
||||
return;
|
||||
|
||||
OpStatus status = OpStatus::CANCELLED;
|
||||
|
|
|
@ -343,6 +343,9 @@ class Transaction {
|
|||
// Get keys multi transaction was initialized with, normalized and unique
|
||||
const absl::flat_hash_set<std::string_view>& GetMultiKeys() const;
|
||||
|
||||
// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
|
||||
void FIX_ConcludeJournalExec();
|
||||
|
||||
private:
|
||||
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
|
||||
struct LockCnt {
|
||||
|
|
|
@ -769,6 +769,38 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
|
|||
assert all(v is None for v in res)
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 4})
|
||||
async def test_simple_scripts(df_local_factory: DflyInstanceFactory):
|
||||
master = df_local_factory.create()
|
||||
replicas = [df_local_factory.create() for _ in range(2)]
|
||||
df_local_factory.start_all([master] + replicas)
|
||||
|
||||
c_replicas = [replica.client() for replica in replicas]
|
||||
c_master = master.client()
|
||||
|
||||
# Connect replicas and wait for sync to finish
|
||||
for c_replica in c_replicas:
|
||||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
|
||||
# Generate some scripts and run them
|
||||
keys = ["a", "b", "c", "d", "e"]
|
||||
for i in range(len(keys) + 1):
|
||||
script = ""
|
||||
subkeys = keys[:i]
|
||||
for key in subkeys:
|
||||
script += f"redis.call('INCR', '{key}')"
|
||||
script += f"redis.call('INCR', '{key}')"
|
||||
|
||||
await c_master.eval(script, len(subkeys), *subkeys)
|
||||
|
||||
# Wait for replicas
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
|
||||
for c_replica in c_replicas:
|
||||
assert (await c_replica.mget(keys)) == ["10", "8", "6", "4", "2"]
|
||||
|
||||
|
||||
"""
|
||||
Test script replication.
|
||||
|
||||
|
|
Loading…
Reference in a new issue