1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: fix search replication (#3547)

chore: fix search replcation

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-08-23 08:35:09 +03:00 committed by Roman Gershman
parent f54d755a5b
commit 0da4e2ae04
No known key found for this signature in database
GPG key ID: F25B77EAF8AEBA7A
3 changed files with 44 additions and 0 deletions

View file

@ -182,15 +182,22 @@ void ShardDocIndex::Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr)
auto cb = [this](string_view key, BaseAccessor* doc) { indices_.Add(key_index_.Add(key), doc); }; auto cb = [this](string_view key, BaseAccessor* doc) { indices_.Add(key_index_.Add(key), doc); };
TraverseAllMatching(*base_, op_args, cb); TraverseAllMatching(*base_, op_args, cb);
was_built_ = true;
VLOG(1) << "Indexed " << key_index_.Size() << " docs on " << base_->prefix; VLOG(1) << "Indexed " << key_index_.Size() << " docs on " << base_->prefix;
} }
void ShardDocIndex::AddDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) { void ShardDocIndex::AddDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) {
if (!was_built_)
return;
auto accessor = GetAccessor(db_cntx, pv); auto accessor = GetAccessor(db_cntx, pv);
indices_.Add(key_index_.Add(key), accessor.get()); indices_.Add(key_index_.Add(key), accessor.get());
} }
void ShardDocIndex::RemoveDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) { void ShardDocIndex::RemoveDoc(string_view key, const DbContext& db_cntx, const PrimeValue& pv) {
if (!was_built_)
return;
auto accessor = GetAccessor(db_cntx, pv); auto accessor = GetAccessor(db_cntx, pv);
DocId id = key_index_.Remove(key); DocId id = key_index_.Remove(key);
indices_.Remove(id, accessor.get()); indices_.Remove(id, accessor.get());

View file

@ -143,6 +143,7 @@ class ShardDocIndex {
void Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr); void Rebuild(const OpArgs& op_args, PMR_NS::memory_resource* mr);
private: private:
bool was_built_ = false;
std::shared_ptr<const DocIndex> base_; std::shared_ptr<const DocIndex> base_;
search::FieldIndices indices_; search::FieldIndices indices_;
DocKeyIndex key_index_; DocKeyIndex key_index_;

View file

@ -1858,6 +1858,42 @@ async def test_search(df_factory):
].id == "k0" ].id == "k0"
@dfly_args({"proactor_threads": 4})
async def test_search_with_stream(df_factory: DflyInstanceFactory):
master = df_factory.create()
replica = df_factory.create()
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
# fill master with hsets and create index
p = c_master.pipeline(transaction=False)
for i in range(10_000):
p.hset(f"k{i}", mapping={"name": f"name of {i}"})
await p.execute()
await c_master.execute_command("FT.CREATE i1 SCHEMA name text")
# start replication and issue one add command and delete commands on master in parallel
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await c_master.hset("secret-key", mapping={"name": "new-secret"})
for i in range(1_000):
await c_master.delete(f"k{i}")
# expect replica to see only 10k - 1k + 1 = 9001 keys in it's index
await wait_available_async(c_replica)
assert await c_replica.execute_command("FT.SEARCH i1 * LIMIT 0 0") == [9_001]
assert await c_replica.execute_command('FT.SEARCH i1 "secret"') == [
1,
"secret-key",
["name", "new-secret"],
]
await close_clients(c_master, c_replica)
# @pytest.mark.slow # @pytest.mark.slow
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_client_pause_with_replica(df_factory, df_seeder_factory): async def test_client_pause_with_replica(df_factory, df_seeder_factory):