1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

fix(search): Fix replication (#2159)

* fix(search): Support replication

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-11-13 11:58:54 +03:00 committed by GitHub
parent cc064a2582
commit 46292968ad
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 178 additions and 68 deletions

View file

@ -11,7 +11,6 @@
#include "base/logging.h"
#include "server/main_service.h"
#include "server/script_mgr.h"
#include "server/search/doc_index.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
@ -236,7 +235,7 @@ void SaveStagesController::SaveDfsSingle(EngineShard* shard) {
auto& [snapshot, filename] = snapshots_[shard ? shard->shard_id() : shard_set->size()];
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
auto glob_data = shard == nullptr ? GetGlobalData() : RdbSaver::GlobalData{};
auto glob_data = shard == nullptr ? RdbSaver::GetGlobalData(service_) : RdbSaver::GlobalData{};
if (auto err = snapshot->Start(mode, filename, glob_data); err) {
shared_err_ = err;
@ -258,7 +257,7 @@ void SaveStagesController::SaveRdb() {
if (!is_cloud_)
filename += ".tmp";
if (auto err = snapshot->Start(SaveMode::RDB, filename, GetGlobalData()); err) {
if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_)); err) {
snapshot.reset();
return;
}
@ -375,31 +374,5 @@ void SaveStagesController::RunStage(void (SaveStagesController::*cb)(unsigned))
}
}
RdbSaver::GlobalData SaveStagesController::GetGlobalData() const {
StringVec script_bodies, search_indices;
{
auto scripts = service_->script_mgr()->GetAll();
script_bodies.reserve(scripts.size());
for (auto& [sha, data] : scripts)
script_bodies.push_back(move(data.body));
}
#ifndef __APPLE__
{
shard_set->Await(0, [&] {
auto* indices = EngineShard::tlocal()->search_indices();
for (auto index_name : indices->GetIndexNames()) {
auto index_info = indices->GetIndex(index_name)->GetInfo();
search_indices.emplace_back(
absl::StrCat(index_name, " ", index_info.BuildRestoreCommand()));
}
});
}
#endif
return RdbSaver::GlobalData{move(script_bodies), move(search_indices)};
}
} // namespace detail
} // namespace dfly

View file

@ -103,8 +103,6 @@ struct SaveStagesController : public SaveStagesInputs {
void RunStage(void (SaveStagesController::*cb)(unsigned));
RdbSaver::GlobalData GetGlobalData() const;
size_t GetSaveBuffersSize();
private:

View file

@ -563,15 +563,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
RdbSaver* saver = flow->saver.get();
if (saver->Mode() == SaveMode::SUMMARY || saver->Mode() == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
auto scripts = sf_->script_mgr()->GetAll();
StringVec script_bodies;
for (auto& [sha, data] : scripts) {
// Always send original body (with header & without auto async calls) that determines the sha,
// It's stored only if it's different from the post-processed version.
string& body = data.orig_body.empty() ? data.body : data.orig_body;
script_bodies.push_back(std::move(body));
}
ec = saver->SaveHeader({script_bodies, {}});
ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service()));
} else {
ec = saver->SaveHeader({});
}

View file

@ -113,6 +113,10 @@ class Service : public facade::ServiceInterface {
return server_family_.script_mgr();
}
const ScriptMgr* script_mgr() const {
return server_family_.script_mgr();
}
ServerFamily& server_family() {
return server_family_;
}

View file

@ -43,9 +43,11 @@ extern "C" {
#include "server/main_service.h"
#include "server/rdb_extensions.h"
#include "server/script_mgr.h"
#include "server/search/doc_index.h"
#include "server/serializer_commons.h"
#include "server/server_state.h"
#include "server/set_family.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
@ -1913,8 +1915,11 @@ error_code RdbLoader::Load(io::Source* src) {
VLOG(1) << "Read RDB_OPCODE_FULLSYNC_END";
RETURN_ON_ERR(EnsureRead(8));
mem_buf_->ConsumeInput(8); // ignore 8 bytes
if (full_sync_cut_cb)
if (full_sync_cut_cb) {
FlushAllShards(); // Flush as the handler awakes post load handlers
full_sync_cut_cb();
}
continue;
}
@ -1985,10 +1990,7 @@ error_code RdbLoader::Load(io::Source* src) {
}
if (type == RDB_OPCODE_JOURNAL_BLOB) {
// We should flush all changes on the current db before applying incremental changes.
for (unsigned i = 0; i < shard_set->size(); ++i) {
FlushShardAsync(i);
}
FlushAllShards(); // Always flush before applying incremental on top
RETURN_ON_ERR(HandleJournalBlob(service_));
continue;
}
@ -2224,7 +2226,7 @@ error_code RdbLoader::HandleAux() {
} else if (auxkey == "redis-bits") {
/* Just ignored. */
} else if (auxkey == "search-index") {
LoadSearchIndexDefFromAux(move(auxval));
LoadSearchIndexDefFromAux(std::move(auxval));
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
@ -2258,6 +2260,11 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
shard_set->Add(sid, std::move(cb));
}
void RdbLoader::FlushAllShards() {
for (ShardId i = 0; i < shard_set->size(); i++)
FlushShardAsync(i);
}
std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv);
std::visit(visitor, opaque.obj);
@ -2369,9 +2376,11 @@ void RdbLoader::LoadScriptFromAux(string&& body) {
void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
cntx.is_replicating = true;
cntx.journal_emulated = true;
cntx.skip_acl_validation = true;
// Avoid deleting local crb
absl::Cleanup cntx_clean = [&cntx] { cntx.Inject(nullptr); };
uint32_t consumed = 0;
@ -2379,7 +2388,7 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::RedisParser parser;
def += "\r\n"; // RESP terminator
absl::Span<uint8_t> buffer{reinterpret_cast<uint8_t*>(def.data()), def.size()};
io::MutableBytes buffer{reinterpret_cast<uint8_t*>(def.data()), def.size()};
auto res = parser.Parse(buffer, &consumed, &resp_vec);
if (res != facade::RedisParser::Result::OK) {
@ -2387,9 +2396,9 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
return;
}
// Prepend FT.CREATE to index definiton
CmdArgVec arg_vec;
facade::RespExpr::VecToArgList(resp_vec, &arg_vec);
string ft_create = "FT.CREATE";
arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()});
@ -2401,4 +2410,28 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
}
}
void RdbLoader::PerformPreLoad(Service* service) {
const CommandId* cmd = service->FindCmd("FT.DROPINDEX");
if (cmd == nullptr)
return; // MacOS
Transaction::RunOnceAsCommand(cmd, [](auto* trans, auto* es) {
for (const auto& name : es->search_indices()->GetIndexNames())
es->search_indices()->DropIndex(name);
return OpStatus::OK;
});
}
void RdbLoader::PerformPostLoad(Service* service) {
const CommandId* cmd = service->FindCmd("FT.CREATE");
if (cmd == nullptr) // On MacOS we don't include search so FT.CREATE won't exist.
return;
// Rebuild all search indices as only their definitions are extracted from the snapshot
Transaction::RunOnceAsCommand(cmd, [](auto* trans, auto* es) {
es->search_indices()->RebuildAllIndices(trans->GetOpArgs(es));
return OpStatus::OK;
});
}
} // namespace dfly

View file

@ -200,6 +200,13 @@ class RdbLoader : protected RdbLoaderBase {
full_sync_cut_cb = std::move(cb);
}
// Perform pre load procedures after transitioning into the global LOADING state.
static void PerformPreLoad(Service* service);
// Performs post load procedures while still remaining in global LOADING state.
// Called once immediately after loading the snapshot / full sync succeeded from the coordinator.
static void PerformPostLoad(Service* service);
private:
struct Item {
std::string key;
@ -229,9 +236,14 @@ class RdbLoader : protected RdbLoaderBase {
void FinishLoad(absl::Time start_time, size_t* keys_loaded);
void FlushShardAsync(ShardId sid);
void FlushAllShards();
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);
void LoadScriptFromAux(std::string&& value);
// Load index definition from RESP string describing it in FT.CREATE format,
// issues an FT.CREATE call, but does not start indexing
void LoadSearchIndexDefFromAux(std::string&& value);
private:

View file

@ -32,7 +32,9 @@ extern "C" {
#include "core/string_set.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/main_service.h"
#include "server/rdb_extensions.h"
#include "server/search/doc_index.h"
#include "server/serializer_commons.h"
#include "server/snapshot.h"
#include "util/fibers/simple_channel.h"
@ -1130,6 +1132,32 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const {
return total_bytes;
}
RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) {
StringVec script_bodies, search_indices;
{
auto scripts = service->script_mgr()->GetAll();
script_bodies.reserve(scripts.size());
for (auto& [sha, data] : scripts)
script_bodies.push_back(move(data.body));
}
#ifndef __APPLE__
{
shard_set->Await(0, [&] {
auto* indices = EngineShard::tlocal()->search_indices();
for (auto index_name : indices->GetIndexNames()) {
auto index_info = indices->GetIndex(index_name)->GetInfo();
search_indices.emplace_back(
absl::StrCat(index_name, " ", index_info.BuildRestoreCommand()));
}
});
}
#endif
return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices)};
}
void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
for (auto& ptr : shard_snapshots_) {
const RdbTypeFreqMap& src_map = ptr->freq_map();
@ -1265,7 +1293,7 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) {
if (!glob_state.search_indices.empty())
LOG(WARNING) << "Dragonfly search index data is incompatible with the RDB format";
} else {
// Search index definitions are not tied to shards and are saved in the summary file
// Search index definitions are not tied to shards and are saved in the summary file
DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_indices.empty());
for (const string& s : glob_state.search_indices)
RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s));

View file

@ -28,6 +28,7 @@ namespace dfly {
uint8_t RdbObjectType(unsigned type, unsigned encoding);
class EngineShard;
class Service;
class AlignedBuffer : public ::io::Sink {
public:
@ -111,6 +112,9 @@ class RdbSaver {
// Get total size of all rdb serializer buffers and items currently placed in channel
size_t GetTotalBuffersSize() const;
// Fetch global data to be serialized in summary part of a snapshot / full sync.
static GlobalData GetGlobalData(const Service* service);
private:
class Impl;

View file

@ -489,6 +489,7 @@ error_code Replica::InitiateDflySync() {
if (num_full_flows == num_df_flows_) {
JournalExecutor{&service_}.FlushAll();
RdbLoader::PerformPreLoad(&service_);
} else if (num_full_flows == 0) {
sync_type = "partial";
} else {
@ -516,15 +517,17 @@ error_code Replica::InitiateDflySync() {
if (cntx_.IsCancelled())
return cntx_.GetError();
RdbLoader::PerformPostLoad(&service_);
// Send DFLY STARTSTABLE.
if (auto ec = SendNextPhaseRequest("STARTSTABLE"); ec) {
return cntx_.ReportError(ec);
}
// Joining flows and resetting state is done by cleanup.
double seconds = double(absl::ToInt64Milliseconds(absl::Now() - start_time)) / 1000;
LOG(INFO) << sync_type << " sync finished in " << strings::HumanReadableElapsedTime(seconds);
return cntx_.GetError();
}

View file

@ -586,8 +586,10 @@ void SearchFamily::Register(CommandRegistry* registry) {
CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL;
registry->StartFamily();
*registry << CI{"FT.CREATE", CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate)
<< CI{"FT.DROPINDEX", CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtDropIndex)
*registry << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
FtCreate)
<< CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(
FtDropIndex)
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
// Underscore same as in RediSearch because it's "temporary" (long time already)
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)

View file

@ -367,21 +367,6 @@ bool IsReplicatingNoOne(string_view host, string_view port) {
return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one");
}
void RebuildAllSearchIndices(Service* service) {
const CommandId* cmd = service->FindCmd("FT.CREATE");
if (cmd == nullptr) {
// On MacOS we don't include search so FT.CREATE won't exist.
return;
}
boost::intrusive_ptr<Transaction> trans{new Transaction{cmd}};
trans->InitByArgs(0, {});
trans->ScheduleSingleHop([](auto* trans, auto* es) {
es->search_indices()->RebuildAllIndices(trans->GetOpArgs(es));
return OpStatus::OK;
});
}
template <typename T> void UpdateMax(T* maxv, T current) {
*maxv = std::max(*maxv, current);
}
@ -693,6 +678,8 @@ Future<GenericError> ServerFamily::Load(const std::string& load_path) {
return {};
}
RdbLoader::PerformPreLoad(&service_);
auto& pool = service_.proactor_pool();
vector<Fiber> load_fibers;
@ -729,11 +716,14 @@ Future<GenericError> ServerFamily::Load(const std::string& load_path) {
for (auto& fiber : load_fibers) {
fiber.Join();
}
if (aggregated_result->first_error) {
LOG(ERROR) << "Rdb load failed. " << (*aggregated_result->first_error).message();
exit(1);
}
RebuildAllSearchIndices(&service_);
RdbLoader::PerformPostLoad(&service_);
LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read;
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
ec_promise.set_value(*(aggregated_result->first_error));

View file

@ -137,6 +137,10 @@ class ServerFamily {
return script_mgr_.get();
}
const ScriptMgr* script_mgr() const {
return script_mgr_.get();
}
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);
// if new_version is true, saves DF specific, non redis compatible snapshot.

View file

@ -1409,6 +1409,22 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false);
}
void Transaction::RunOnceAsCommand(const CommandId* cid, RunnableType cb) {
if (!ProactorBase::IsProactorThread())
return shard_set->pool()->at(0)->Await([cid, cb] { return RunOnceAsCommand(cid, cb); });
DCHECK(cid);
DCHECK(cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL));
DCHECK(ProactorBase::IsProactorThread());
boost::intrusive_ptr<Transaction> trans{new Transaction{cid}};
trans->InitByArgs(0, {});
trans->ScheduleSingleHop([cb](auto* trans, auto* es) {
cb(trans, es);
return OpStatus::OK;
});
}
void Transaction::CancelBlocking() {
if (coordinator_state_ & COORD_BLOCKED) {
coordinator_state_ |= COORD_CANCELLED;

View file

@ -320,6 +320,9 @@ class Transaction {
bool multi_commands, bool allow_await) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
// Utility to run a single hop on a no-key command
static void RunOnceAsCommand(const CommandId* cid, RunnableType cb);
private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt {

View file

@ -1713,3 +1713,51 @@ async def test_network_disconnect_small_buffer(df_local_factory, df_seeder_facto
master.stop()
replica.stop()
assert master.is_in_logs("Partial sync requested from stale LSN")
async def test_search(df_local_factory):
master = df_local_factory.create(proactor_threads=4)
replica = df_local_factory.create(proactor_threads=4)
df_local_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
# First, create an index on replica
await c_replica.execute_command("FT.CREATE", "idx-r", "SCHEMA", "f1", "numeric")
for i in range(0, 10):
await c_replica.hset(f"k{i}", mapping={"f1": i})
assert (await c_replica.ft("idx-r").search("@f1:[5 9]")).total == 5
# Second, create an index on master
await c_master.execute_command("FT.CREATE", "idx-m", "SCHEMA", "f2", "numeric")
for i in range(0, 10):
await c_master.hset(f"k{i}", mapping={"f2": i * 2})
assert (await c_master.ft("idx-m").search("@f2:[6 10]")).total == 3
# Replicate
await c_replica.execute_command("REPLICAOF", "localhost", master.port)
await wait_available_async(c_replica)
# Check master index was picked up and original index was deleted
assert (await c_replica.execute_command("FT._LIST")) == ["idx-m"]
# Check query from master runs on replica
assert (await c_replica.ft("idx-m").search("@f2:[6 10]")).total == 3
# Set a new key
await c_master.hset("kNEW", mapping={"f2": 100})
await asyncio.sleep(0.1)
assert (await c_replica.ft("idx-m").search("@f2:[100 100]")).docs[0].id == "kNEW"
# Create a new aux index on master
await c_master.execute_command("FT.CREATE", "idx-m2", "SCHEMA", "f2", "numeric", "sortable")
await asyncio.sleep(0.1)
from redis.commands.search.query import Query
assert (await c_replica.ft("idx-m2").search(Query("*").sort_by("f2").paging(0, 1))).docs[
0
].id == "k0"