diff --git a/src/server/search/doc_index.cc b/src/server/search/doc_index.cc index 7daf0cca8..b3207859c 100644 --- a/src/server/search/doc_index.cc +++ b/src/server/search/doc_index.cc @@ -182,15 +182,22 @@ void ShardDocIndex::Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr) auto cb = [this](string_view key, BaseAccessor* doc) { indices_.Add(key_index_.Add(key), doc); }; TraverseAllMatching(*base_, op_args, cb); + was_built_ = true; VLOG(1) << "Indexed " << key_index_.Size() << " docs on " << base_->prefix; } void ShardDocIndex::AddDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) { + if (!was_built_) + return; + auto accessor = GetAccessor(db_cntx, pv); indices_.Add(key_index_.Add(key), accessor.get()); } void ShardDocIndex::RemoveDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) { + if (!was_built_) + return; + auto accessor = GetAccessor(db_cntx, pv); DocId id = key_index_.Remove(key); indices_.Remove(id, accessor.get()); diff --git a/src/server/search/doc_index.h b/src/server/search/doc_index.h index ebc689c4a..12b368335 100644 --- a/src/server/search/doc_index.h +++ b/src/server/search/doc_index.h @@ -143,6 +143,7 @@ class ShardDocIndex { void Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr); private: + bool was_built_ = false; std::shared_ptr base_; search::FieldIndices indices_; DocKeyIndex key_index_; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d60b54cb0..547eb7b67 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1858,6 +1858,42 @@ async def test_search(df_factory): ].id == "k0" +@dfly_args({"proactor_threads": 4}) +async def test_search_with_stream(df_factory: DflyInstanceFactory): + master = df_factory.create() + replica = df_factory.create() + + df_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + # fill master with hsets and create index + p = c_master.pipeline(transaction=False) + for i in range(10_000): + p.hset(f"k{i}", mapping={"name": f"name of {i}"}) + await p.execute() + + await c_master.execute_command("FT.CREATE i1 SCHEMA name text") + + # start replication and issue one add command and delete commands on master in parallel + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await c_master.hset("secret-key", mapping={"name": "new-secret"}) + for i in range(1_000): + await c_master.delete(f"k{i}") + + # expect replica to see only 10k - 1k + 1 = 9001 keys in it's index + await wait_available_async(c_replica) + assert await c_replica.execute_command("FT.SEARCH i1 * LIMIT 0 0") == [9_001] + assert await c_replica.execute_command('FT.SEARCH i1 "secret"') == [ + 1, + "secret-key", + ["name", "new-secret"], + ] + + await close_clients(c_master, c_replica) + + # @pytest.mark.slow @pytest.mark.asyncio async def test_client_pause_with_replica(df_factory, df_seeder_factory):