mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: try
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
e16ef838e4
commit
c2c1f4ded3
5 changed files with 24 additions and 7 deletions
|
@ -1000,6 +1000,7 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
if (lock_args.fps.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands.
|
if (lock_args.fps.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
DCHECK_LT(lock_args.db_index, db_array_size());
|
||||||
|
|
||||||
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||||
bool lock_acquired = true;
|
bool lock_acquired = true;
|
||||||
|
|
|
@ -1607,6 +1607,21 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil
|
||||||
if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) {
|
if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) {
|
||||||
return builder->SendError(kDbIndOutOfRangeErr);
|
return builder->SendError(kDbIndOutOfRangeErr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cntx->conn_state.db_index == index) {
|
||||||
|
// auto cb = [](EngineShard* shard) { return OpStatus::OK; };
|
||||||
|
auto& db_slice = cntx->ns->GetDbSlice(EngineShard::tlocal()->shard_id());
|
||||||
|
DCHECK_LT(index, db_slice.db_array_size());
|
||||||
|
DCHECK(db_slice.GetDBTable(index));
|
||||||
|
// shard_set->RunBriefInParallel(std::move(cb));
|
||||||
|
|
||||||
|
return builder->SendOk();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cntx->conn_state.exec_info.IsRunning()) {
|
||||||
|
return builder->SendError("SELECT is not allowed in a transaction");
|
||||||
|
}
|
||||||
|
|
||||||
cntx->conn_state.db_index = index;
|
cntx->conn_state.db_index = index;
|
||||||
auto cb = [ns = cntx->ns, index](EngineShard* shard) {
|
auto cb = [ns = cntx->ns, index](EngineShard* shard) {
|
||||||
auto& db_slice = ns->GetDbSlice(shard->shard_id());
|
auto& db_slice = ns->GetDbSlice(shard->shard_id());
|
||||||
|
|
|
@ -76,8 +76,6 @@ void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void JournalExecutor::SelectDb(DbIndex dbid) {
|
void JournalExecutor::SelectDb(DbIndex dbid) {
|
||||||
conn_context_.conn_state.db_index = dbid;
|
|
||||||
|
|
||||||
if (ensured_dbs_.size() <= dbid)
|
if (ensured_dbs_.size() <= dbid)
|
||||||
ensured_dbs_.resize(dbid + 1);
|
ensured_dbs_.resize(dbid + 1);
|
||||||
|
|
||||||
|
@ -85,6 +83,8 @@ void JournalExecutor::SelectDb(DbIndex dbid) {
|
||||||
auto cmd = BuildFromParts("SELECT", dbid);
|
auto cmd = BuildFromParts("SELECT", dbid);
|
||||||
Execute(cmd);
|
Execute(cmd);
|
||||||
ensured_dbs_[dbid] = true;
|
ensured_dbs_[dbid] = true;
|
||||||
|
} else {
|
||||||
|
conn_context_.conn_state.db_index = dbid;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1084,7 +1084,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
|
||||||
return ErrorReply{"-READONLY You can't write against a read only replica."};
|
return ErrorReply{"-READONLY You can't write against a read only replica."};
|
||||||
|
|
||||||
if (multi_active) {
|
if (multi_active) {
|
||||||
if (cmd_name == "SELECT" || absl::EndsWith(cmd_name, "SUBSCRIBE"))
|
if (absl::EndsWith(cmd_name, "SUBSCRIBE"))
|
||||||
return ErrorReply{absl::StrCat("Can not call ", cmd_name, " within a transaction")};
|
return ErrorReply{absl::StrCat("Can not call ", cmd_name, " within a transaction")};
|
||||||
|
|
||||||
if (cmd_name == "WATCH" || cmd_name == "FLUSHALL" || cmd_name == "FLUSHDB")
|
if (cmd_name == "WATCH" || cmd_name == "FLUSHALL" || cmd_name == "FLUSHDB")
|
||||||
|
|
|
@ -123,7 +123,8 @@ async def test_replication_all(
|
||||||
async def check():
|
async def check():
|
||||||
await check_all_replicas_finished(c_replicas, c_master)
|
await check_all_replicas_finished(c_replicas, c_master)
|
||||||
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas))
|
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas))
|
||||||
assert len(set(hashes)) == 1
|
if len(set(hashes)) != 1:
|
||||||
|
breakpoint()
|
||||||
|
|
||||||
await check()
|
await check()
|
||||||
# Stream more data in stable state
|
# Stream more data in stable state
|
||||||
|
@ -1067,7 +1068,7 @@ async def assert_replica_reconnections(replica_inst, initial_reconnects_count):
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_replication_info(df_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000):
|
async def test_replication_info(df_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000):
|
||||||
master = df_factory.create()
|
master = df_factory.create()
|
||||||
replica = df_factory.create(logtostdout=True, replication_acks_interval=100)
|
replica = df_factory.create(replication_acks_interval=100)
|
||||||
df_factory.start_all([master, replica])
|
df_factory.start_all([master, replica])
|
||||||
c_master = master.client()
|
c_master = master.client()
|
||||||
c_replica = replica.client()
|
c_replica = replica.client()
|
||||||
|
@ -1157,8 +1158,8 @@ redis.call('SET', 'A', 'ErrroR')
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_readonly_script(df_factory):
|
async def test_readonly_script(df_factory):
|
||||||
master = df_factory.create(proactor_threads=2, logtostdout=True)
|
master = df_factory.create(proactor_threads=2)
|
||||||
replica = df_factory.create(proactor_threads=2, logtostdout=True)
|
replica = df_factory.create(proactor_threads=2)
|
||||||
|
|
||||||
df_factory.start_all([master, replica])
|
df_factory.start_all([master, replica])
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue