1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Basic search (#1187)

Basic search (ft.create & ft.search)
This commit is contained in:
Vladislav 2023-05-09 10:09:41 +03:00 committed by GitHub
parent 91c25c6d61
commit c3dc05a571
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 433 additions and 28 deletions

View file

@ -3,7 +3,7 @@ add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc
segment_allocator.cc simple_lru_counter.cc small_string.cc tx_queue.cc dense_set.cc segment_allocator.cc simple_lru_counter.cc small_string.cc tx_queue.cc dense_set.cc
string_set.cc string_map.cc detail/bitpacking.cc) string_set.cc string_map.cc detail/bitpacking.cc)
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules cxx_link(dfly_core base query_parser absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules
fibers2 TRDP::jsoncons OpenSSL::Crypto) fibers2 TRDP::jsoncons OpenSSL::Crypto)
add_executable(dash_bench dash_bench.cc) add_executable(dash_bench dash_bench.cc)

View file

@ -3,6 +3,7 @@ gen_bison(parser)
cur_gen_dir(gen_dir) cur_gen_dir(gen_dir)
add_library(query_parser base.cc ast_expr.cc query_driver.cc ${gen_dir}/parser.cc ${gen_dir}/lexer.cc) add_library(query_parser base.cc ast_expr.cc query_driver.cc search.cc
${gen_dir}/parser.cc ${gen_dir}/lexer.cc)
target_link_libraries(query_parser base absl::strings TRDP::reflex) target_link_libraries(query_parser base absl::strings TRDP::reflex)
cxx_test(search_parser_test query_parser LABELS DFLY) cxx_test(search_parser_test query_parser LABELS DFLY)

26
src/core/search/search.cc Normal file
View file

@ -0,0 +1,26 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/search/search.h"
#include "core/search/query_driver.h"
using namespace std;
namespace dfly::search {
AstExpr ParseQuery(std::string_view query) {
QueryDriver driver{};
driver.ResetScanner();
driver.SetInput(std::string{query});
try {
(void)Parser (&driver)();
} catch (...) {
// TODO: return detailed error info
return {};
}
return driver.Get();
}
} // namespace dfly::search

13
src/core/search/search.h Normal file
View file

@ -0,0 +1,13 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "core/search/ast_expr.h"
namespace dfly::search {
AstExpr ParseQuery(std::string_view query);
} // namespace dfly::search

View file

@ -16,7 +16,7 @@ cxx_link(dfly_transaction dfly_core strings_lib)
add_library(dragonfly_lib channel_store.cc command_registry.cc add_library(dragonfly_lib channel_store.cc command_registry.cc
config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc json_family.cc generic_family.cc hset_family.cc json_family.cc search_family.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
set_family.cc stream_family.cc string_family.cc set_family.cc stream_family.cc string_family.cc
@ -49,6 +49,7 @@ cxx_test(journal_test dfly_test_lib LABELS DFLY)
cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
cxx_test(top_keys_test dfly_test_lib LABELS DFLY) cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY) cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(search_family_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test add_dependencies(check_dfly dragonfly_test json_family_test list_family_test

View file

@ -4,6 +4,8 @@
#include "server/container_utils.h" #include "server/container_utils.h"
#include "base/logging.h" #include "base/logging.h"
#include "core/string_map.h"
#include "core/string_set.h"
extern "C" { extern "C" {
#include "redis/intset.h" #include "redis/intset.h"
@ -16,6 +18,8 @@ extern "C" {
namespace dfly::container_utils { namespace dfly::container_utils {
using namespace std;
quicklistEntry QLEntry() { quicklistEntry QLEntry() {
quicklistEntry res{.quicklist = NULL, quicklistEntry res{.quicklist = NULL,
.node = NULL, .node = NULL,
@ -158,4 +162,30 @@ bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start,
return false; return false;
} }
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) {
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* res = static_cast<StringMap*>(pv.RObjPtr());
uint32_t map_time = MemberTimeSeconds(db_context.time_now_ms);
res->set_time(map_time);
return res;
}
optional<string_view> LpFind(uint8_t* lp, string_view key, uint8_t int_buf[]) {
uint8_t* fptr = lpFirst(lp);
DCHECK(fptr);
fptr = lpFind(lp, fptr, (unsigned char*)key.data(), key.size(), 1);
if (!fptr)
return std::nullopt;
uint8_t* vptr = lpNext(lp, fptr);
return LpGetView(vptr, int_buf);
}
string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
int64_t ele_len = 0;
uint8_t* elem = lpGet(lp_it, &ele_len, int_buf);
DCHECK(elem);
return std::string_view{reinterpret_cast<char*>(elem), size_t(ele_len)};
}
} // namespace dfly::container_utils } // namespace dfly::container_utils

View file

@ -3,11 +3,12 @@
// //
#pragma once #pragma once
#include "base/logging.h"
#include "core/compact_object.h" #include "core/compact_object.h"
#include "core/string_set.h"
#include "server/table.h" #include "server/table.h"
extern "C" { extern "C" {
#include "redis/listpack.h"
#include "redis/object.h" #include "redis/object.h"
#include "redis/quicklist.h" #include "redis/quicklist.h"
} }
@ -16,6 +17,8 @@ extern "C" {
namespace dfly { namespace dfly {
class StringMap;
namespace container_utils { namespace container_utils {
// IsContainer returns true if the iterator points to a container type. // IsContainer returns true if the iterator points to a container type.
@ -69,6 +72,15 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func);
bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0, bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0,
int32_t end = -1, bool reverse = false, bool use_score = false); int32_t end = -1, bool reverse = false, bool use_score = false);
// Get StringMap pointer from primetable value. Sets expire time from db_context
StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context);
// Get string_view from listpack poiner. Intbuf to store integer values as strings.
std::string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]);
// Find value by key and return stringview to it, otherwise nullopt.
std::optional<std::string_view> LpFind(uint8_t* lp, std::string_view key, uint8_t int_buf[]);
}; // namespace container_utils }; // namespace container_utils
} // namespace dfly } // namespace dfly

View file

@ -16,6 +16,7 @@ extern "C" {
#include "facade/error.h" #include "facade/error.h"
#include "server/command_registry.h" #include "server/command_registry.h"
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/transaction.h" #include "server/transaction.h"
@ -43,30 +44,9 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
return lpBytes(const_cast<uint8_t*>(lp)) + sum < kMaxListPackLen; return lpBytes(const_cast<uint8_t*>(lp)) + sum < kMaxListPackLen;
} }
inline StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) { using container_utils::GetStringMap;
StringMap* res = (StringMap*)pv.RObjPtr(); using container_utils::LpFind;
uint32_t map_time = MemberTimeSeconds(db_context.time_now_ms); using container_utils::LpGetView;
res->set_time(map_time);
return res;
}
inline string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
int64_t ele_len = 0;
uint8_t* elem = lpGet(lp_it, &ele_len, int_buf);
DCHECK(elem);
return string_view{reinterpret_cast<char*>(elem), size_t(ele_len)};
}
optional<string_view> LpFind(uint8_t* lp, string_view key, uint8_t int_buf[]) {
uint8_t* fptr = lpFirst(lp);
DCHECK(fptr);
fptr = lpFind(lp, fptr, (unsigned char*)key.data(), key.size(), 1);
if (!fptr)
return nullopt;
uint8_t* vptr = lpNext(lp, fptr);
return LpGetView(vptr, int_buf);
}
pair<uint8_t*, bool> LpDelete(uint8_t* lp, string_view field) { pair<uint8_t*, bool> LpDelete(uint8_t* lp, string_view field) {
uint8_t* fptr = lpFirst(lp); uint8_t* fptr = lpFirst(lp);

View file

@ -32,6 +32,7 @@ extern "C" {
#include "server/list_family.h" #include "server/list_family.h"
#include "server/multi_command_squasher.h" #include "server/multi_command_squasher.h"
#include "server/script_mgr.h" #include "server/script_mgr.h"
#include "server/search_family.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "server/set_family.h" #include "server/set_family.h"
#include "server/stream_family.h" #include "server/stream_family.h"
@ -1772,6 +1773,7 @@ void Service::RegisterCommands() {
JsonFamily::Register(&registry_); JsonFamily::Register(&registry_);
BitOpsFamily::Register(&registry_); BitOpsFamily::Register(&registry_);
HllFamily::Register(&registry_); HllFamily::Register(&registry_);
SearchFamily::Register(&registry_);
server_family_.Register(&registry_); server_family_.Register(&registry_);

243
src/server/search_family.cc Normal file
View file

@ -0,0 +1,243 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/search_family.h"
#include <variant>
#include <vector>
#include "base/logging.h"
#include "core/search/search.h"
#include "facade/reply_builder.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
extern "C" {
#include "redis/listpack.h"
#include "redis/object.h"
};
namespace dfly {
using namespace std;
using namespace facade;
namespace {
string_view SdsToSafeSv(sds str) {
return str != nullptr ? string_view{str, sdslen(str)} : ""sv;
}
using DocumentData = absl::flat_hash_map<std::string, std::string>;
using SerializedDocument = pair<std::string /*key*/, DocumentData>;
using Query = search::AstExpr;
struct BaseAccessor : public search::HSetAccessor {
using FieldConsumer = search::HSetAccessor::FieldConsumer;
virtual DocumentData Serialize() const = 0;
};
struct ListPackAccessor : public BaseAccessor {
using LpPtr = uint8_t*;
ListPackAccessor(LpPtr ptr) : lp_{ptr} {
}
bool Check(FieldConsumer f, string_view active_field) const override {
std::array<uint8_t, LP_INTBUF_SIZE> intbuf;
if (!active_field.empty()) {
return f(container_utils::LpFind(lp_, active_field, intbuf.data()).value_or(""));
}
uint8_t* fptr = lpFirst(lp_);
DCHECK_NE(fptr, nullptr);
while (fptr) {
fptr = lpNext(lp_, fptr); // skip key
string_view v = container_utils::LpGetView(fptr, intbuf.data());
fptr = lpNext(lp_, fptr);
if (f(v))
return true;
}
return false;
}
DocumentData Serialize() const override {
std::array<uint8_t, LP_INTBUF_SIZE> intbuf[2];
DocumentData out{};
uint8_t* fptr = lpFirst(lp_);
DCHECK_NE(fptr, nullptr);
while (fptr) {
string_view k = container_utils::LpGetView(fptr, intbuf[0].data());
fptr = lpNext(lp_, fptr); // skip key
string_view v = container_utils::LpGetView(fptr, intbuf[1].data());
fptr = lpNext(lp_, fptr);
out[k] = v;
}
return out;
}
private:
LpPtr lp_;
};
struct StringMapAccessor : public BaseAccessor {
StringMapAccessor(StringMap* hset) : hset_{hset} {
}
bool Check(FieldConsumer f, string_view active_field) const override {
if (!active_field.empty()) {
return f(SdsToSafeSv(hset_->Find(active_field)));
}
for (const auto& [k, v] : *hset_) {
if (f(SdsToSafeSv(v)))
return true;
}
return false;
}
DocumentData Serialize() const override {
DocumentData out{};
for (const auto& [k, v] : *hset_)
out[SdsToSafeSv(k)] = SdsToSafeSv(v);
return out;
}
private:
StringMap* hset_;
};
unique_ptr<BaseAccessor> GetAccessor(const OpArgs& op_args, const PrimeValue& pv) {
if (pv.Encoding() == kEncodingListPack) {
auto ptr = reinterpret_cast<ListPackAccessor::LpPtr>(pv.RObjPtr());
return make_unique<ListPackAccessor>(ptr);
} else {
auto* sm = container_utils::GetStringMap(pv, op_args.db_cntx);
return make_unique<StringMapAccessor>(sm);
}
}
// Perform brute force search for all hashes in shard with specific prefix
// that match the query
void OpSearch(const OpArgs& op_args, string_view prefix, const Query& query,
vector<SerializedDocument>* shard_out) {
auto& db_slice = op_args.shard->db_slice();
DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index));
auto [prime_table, _] = db_slice.GetTables(op_args.db_cntx.db_index);
string scratch;
auto cb = [&](PrimeTable::iterator it) {
// Check entry is hash
const PrimeValue& pv = it->second;
if (pv.ObjType() != OBJ_HASH)
return;
// Check key starts with prefix
string_view key = it->first.GetSlice(&scratch);
if (key.rfind(prefix, 0) != 0)
return;
// Check entry matches filter
auto accessor = GetAccessor(op_args, pv);
if (query->Check(search::SearchInput{accessor.get()}))
shard_out->emplace_back(key, accessor->Serialize());
};
PrimeTable::Cursor cursor;
do {
cursor = prime_table->Traverse(cursor, cb);
} while (cursor);
}
} // namespace
void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) {
string_view idx = ArgS(args, 0);
string prefix;
if (args.size() > 1 && ArgS(args, 1) == "ON") {
if (ArgS(args, 2) != "HASH" || ArgS(args, 3) != "PREFIX" || ArgS(args, 4) != "1") {
(*cntx)->SendError("Only simplest config supported");
return;
}
prefix = ArgS(args, 5);
}
{
lock_guard lk{indices_mu_};
indices_[idx] = prefix;
}
(*cntx)->SendOk();
}
void SearchFamily::FtSearch(CmdArgList args, ConnectionContext* cntx) {
string_view index = ArgS(args, 0);
string_view query_str = ArgS(args, 1);
string prefix;
{
lock_guard lk{indices_mu_};
auto it = indices_.find(index);
if (it == indices_.end()) {
(*cntx)->SendError(string{index} + ": no such index");
return;
}
prefix = it->second;
}
Query query = search::ParseQuery(query_str);
if (!query) {
(*cntx)->SendError("Syntax error");
return;
}
vector<vector<SerializedDocument>> docs(shard_set->size());
cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) {
OpSearch(t->GetOpArgs(shard), prefix, query, &docs[shard->shard_id()]);
return OpStatus::OK;
});
size_t total_count = 0;
for (const auto& shard_docs : docs)
total_count += shard_docs.size();
(*cntx)->StartArray(total_count * 2 + 1);
(*cntx)->SendLong(total_count);
for (const auto& shard_docs : docs) {
for (const auto& [key, doc] : shard_docs) {
(*cntx)->SendBulkString(key);
(*cntx)->StartCollection(doc.size(), RedisReplyBuilder::MAP);
for (const auto& [k, v] : doc) {
(*cntx)->SendBulkString(k);
(*cntx)->SendBulkString(v);
}
}
}
}
#define HFUNC(x) SetHandler(&SearchFamily::x)
void SearchFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
*registry << CI{"FT.CREATE", CO::FAST, -2, 0, 0, 0}.HFUNC(FtCreate)
<< CI{"FT.SEARCH", CO::GLOBAL_TRANS, -3, 0, 0, 0}.HFUNC(FtSearch);
}
Mutex SearchFamily::indices_mu_{};
absl::flat_hash_map<std::string, std::string> SearchFamily::indices_{};
} // namespace dfly

View file

@ -0,0 +1,30 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/flat_hash_map.h>
#include <string>
#include "base/mutex.h"
#include "server/common.h"
namespace dfly {
class CommandRegistry;
class ConnectionContext;
class SearchFamily {
static void FtCreate(CmdArgList args, ConnectionContext* cntx);
static void FtSearch(CmdArgList args, ConnectionContext* cntx);
public:
static void Register(CommandRegistry* registry);
private:
static Mutex indices_mu_;
static absl::flat_hash_map<std::string, std::string> indices_;
};
} // namespace dfly

View file

@ -0,0 +1,67 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/search_family.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/command_registry.h"
#include "server/test_utils.h"
using namespace testing;
using namespace std;
using namespace util;
namespace dfly {
class SearchFamilyTest : public BaseFamilyTest {
protected:
};
const auto kNoResults = IntArg(0); // tests auto destruct single element arrays
TEST_F(SearchFamilyTest, CreateIndex) {
EXPECT_EQ(Run({"ft.create", "idx", "ON", "HASH", "PREFIX", "1", "prefix"}), "OK");
}
TEST_F(SearchFamilyTest, Simple) {
EXPECT_EQ(Run({"ft.create", "i1", "ON", "HASH", "PREFIX", "1", "d:"}), "OK");
Run({"hset", "d:1", "foo", "baz", "k", "v"});
Run({"hset", "d:2", "foo", "bar", "k", "v"});
Run({"hset", "d:3", "foo", "bad", "k", "v"});
{
auto resp = Run({"ft.search", "i1", "@foo:bar"});
EXPECT_THAT(resp, ArrLen(1 + 2)); // single key-data pair of d:2
auto doc = resp.GetVec();
EXPECT_THAT(doc[0], IntArg(1));
EXPECT_EQ(doc[1], "d:2");
EXPECT_THAT(doc[2], ArrLen(4)); // foo and k pairs
}
EXPECT_THAT(Run({"ft.search", "i1", "@foo:bar | @foo:baz"}), ArrLen(1 + 2 * 2));
EXPECT_THAT(Run({"ft.search", "i1", "@foo:(bar|baz|bad)"}), ArrLen(1 + 3 * 2));
EXPECT_THAT(Run({"ft.search", "i1", "@foo:none"}), kNoResults);
EXPECT_THAT(Run({"ft.search", "iNone", "@foo:bar"}), ErrArg("iNone: no such index"));
EXPECT_THAT(Run({"ft.search", "i1", "@@NOTAQUERY@@"}), ErrArg("Syntax error"));
// w: prefix is not part of index
Run({"hset", "w:2", "foo", "this", "k", "v"});
EXPECT_THAT(Run({"ft.search", "i1", "@foo:this"}), kNoResults);
}
TEST_F(SearchFamilyTest, NoPrefix) {
EXPECT_EQ(Run({"ft.create", "i1"}), "OK");
Run({"hset", "d:1", "a", "one", "k", "v"});
Run({"hset", "d:2", "b", "two", "k", "v"});
Run({"hset", "d:3", "c", "three", "k", "v"});
EXPECT_THAT(Run({"ft.search", "i1", "one | three"}), ArrLen(1 + 2 * 2));
}
} // namespace dfly