diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 0c5605a2c..ba1e477d6 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1000,6 +1000,7 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { if (lock_args.fps.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands. return true; } + DCHECK_LT(lock_args.db_index, db_array_size()); auto& lt = db_arr_[lock_args.db_index]->trans_locks; bool lock_acquired = true; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 5cfe94c5f..b9b1024ea 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1607,6 +1607,21 @@ void GenericFamily::Select(CmdArgList args, Transaction*, SinkReplyBuilder* buil if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) { return builder->SendError(kDbIndOutOfRangeErr); } + + if (cntx->conn_state.db_index == index) { + // auto cb = [](EngineShard* shard) { return OpStatus::OK; }; + auto& db_slice = cntx->ns->GetDbSlice(EngineShard::tlocal()->shard_id()); + DCHECK_LT(index, db_slice.db_array_size()); + DCHECK(db_slice.GetDBTable(index)); + // shard_set->RunBriefInParallel(std::move(cb)); + + return builder->SendOk(); + } + + if (cntx->conn_state.exec_info.IsRunning()) { + return builder->SendError("SELECT is not allowed in a transaction"); + } + cntx->conn_state.db_index = index; auto cb = [ns = cntx->ns, index](EngineShard* shard) { auto& db_slice = ns->GetDbSlice(shard->shard_id()); diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index ea1b2dd84..f8c85e8eb 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -76,8 +76,6 @@ void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) { } void JournalExecutor::SelectDb(DbIndex dbid) { - conn_context_.conn_state.db_index = dbid; - if (ensured_dbs_.size() <= dbid) ensured_dbs_.resize(dbid + 1); @@ -85,6 +83,8 @@ void JournalExecutor::SelectDb(DbIndex dbid) { auto cmd = BuildFromParts("SELECT", dbid); Execute(cmd); ensured_dbs_[dbid] = true; + } else { + conn_context_.conn_state.db_index = dbid; } } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index b6c361bab..7d872d01c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1084,7 +1084,7 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA return ErrorReply{"-READONLY You can't write against a read only replica."}; if (multi_active) { - if (cmd_name == "SELECT" || absl::EndsWith(cmd_name, "SUBSCRIBE")) + if (absl::EndsWith(cmd_name, "SUBSCRIBE")) return ErrorReply{absl::StrCat("Can not call ", cmd_name, " within a transaction")}; if (cmd_name == "WATCH" || cmd_name == "FLUSHALL" || cmd_name == "FLUSHDB") diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d5229effb..f1d7ff8fa 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -123,7 +123,8 @@ async def test_replication_all( async def check(): await check_all_replicas_finished(c_replicas, c_master) hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master] + c_replicas)) - assert len(set(hashes)) == 1 + if len(set(hashes)) != 1: + breakpoint() await check() # Stream more data in stable state @@ -1067,7 +1068,7 @@ async def assert_replica_reconnections(replica_inst, initial_reconnects_count): @pytest.mark.asyncio async def test_replication_info(df_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000): master = df_factory.create() - replica = df_factory.create(logtostdout=True, replication_acks_interval=100) + replica = df_factory.create(replication_acks_interval=100) df_factory.start_all([master, replica]) c_master = master.client() c_replica = replica.client() @@ -1157,8 +1158,8 @@ redis.call('SET', 'A', 'ErrroR') @pytest.mark.asyncio async def test_readonly_script(df_factory): - master = df_factory.create(proactor_threads=2, logtostdout=True) - replica = df_factory.create(proactor_threads=2, logtostdout=True) + master = df_factory.create(proactor_threads=2) + replica = df_factory.create(proactor_threads=2) df_factory.start_all([master, replica])