diff --git a/src/core/CMakeLists.txt b/src/core/CMakeLists.txt index 24c16cf92..edc2fb6a8 100644 --- a/src/core/CMakeLists.txt +++ b/src/core/CMakeLists.txt @@ -3,7 +3,7 @@ add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc segment_allocator.cc simple_lru_counter.cc small_string.cc tx_queue.cc dense_set.cc string_set.cc string_map.cc detail/bitpacking.cc) -cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules +cxx_link(dfly_core base query_parser absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules fibers2 TRDP::jsoncons OpenSSL::Crypto) add_executable(dash_bench dash_bench.cc) diff --git a/src/core/search/CMakeLists.txt b/src/core/search/CMakeLists.txt index 50be4e583..f98bda960 100644 --- a/src/core/search/CMakeLists.txt +++ b/src/core/search/CMakeLists.txt @@ -3,6 +3,7 @@ gen_bison(parser) cur_gen_dir(gen_dir) -add_library(query_parser base.cc ast_expr.cc query_driver.cc ${gen_dir}/parser.cc ${gen_dir}/lexer.cc) +add_library(query_parser base.cc ast_expr.cc query_driver.cc search.cc + ${gen_dir}/parser.cc ${gen_dir}/lexer.cc) target_link_libraries(query_parser base absl::strings TRDP::reflex) cxx_test(search_parser_test query_parser LABELS DFLY) diff --git a/src/core/search/search.cc b/src/core/search/search.cc new file mode 100644 index 000000000..1e98b2b77 --- /dev/null +++ b/src/core/search/search.cc @@ -0,0 +1,26 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "core/search/search.h" + +#include "core/search/query_driver.h" + +using namespace std; + +namespace dfly::search { + +AstExpr ParseQuery(std::string_view query) { + QueryDriver driver{}; + driver.ResetScanner(); + driver.SetInput(std::string{query}); + try { + (void)Parser (&driver)(); + } catch (...) { + // TODO: return detailed error info + return {}; + } + return driver.Get(); +} + +} // namespace dfly::search diff --git a/src/core/search/search.h b/src/core/search/search.h new file mode 100644 index 000000000..4cc956b4c --- /dev/null +++ b/src/core/search/search.h @@ -0,0 +1,13 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "core/search/ast_expr.h" + +namespace dfly::search { + +AstExpr ParseQuery(std::string_view query); + +} // namespace dfly::search diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index f94098188..1b2cec4f2 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -16,7 +16,7 @@ cxx_link(dfly_transaction dfly_core strings_lib) add_library(dragonfly_lib channel_store.cc command_registry.cc config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc - generic_family.cc hset_family.cc json_family.cc + generic_family.cc hset_family.cc json_family.cc search_family.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc set_family.cc stream_family.cc string_family.cc @@ -49,6 +49,7 @@ cxx_test(journal_test dfly_test_lib LABELS DFLY) cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) cxx_test(top_keys_test dfly_test_lib LABELS DFLY) cxx_test(hll_family_test dfly_test_lib LABELS DFLY) +cxx_test(search_family_test dfly_test_lib LABELS DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_dependencies(check_dfly dragonfly_test json_family_test list_family_test diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 91c1b5672..234000792 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -4,6 +4,8 @@ #include "server/container_utils.h" #include "base/logging.h" +#include "core/string_map.h" +#include "core/string_set.h" extern "C" { #include "redis/intset.h" @@ -16,6 +18,8 @@ extern "C" { namespace dfly::container_utils { +using namespace std; + quicklistEntry QLEntry() { quicklistEntry res{.quicklist = NULL, .node = NULL, @@ -158,4 +162,30 @@ bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start, return false; } +StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) { + DCHECK_EQ(pv.Encoding(), kEncodingStrMap2); + StringMap* res = static_cast(pv.RObjPtr()); + uint32_t map_time = MemberTimeSeconds(db_context.time_now_ms); + res->set_time(map_time); + return res; +} + +optional LpFind(uint8_t* lp, string_view key, uint8_t int_buf[]) { + uint8_t* fptr = lpFirst(lp); + DCHECK(fptr); + + fptr = lpFind(lp, fptr, (unsigned char*)key.data(), key.size(), 1); + if (!fptr) + return std::nullopt; + uint8_t* vptr = lpNext(lp, fptr); + return LpGetView(vptr, int_buf); +} + +string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) { + int64_t ele_len = 0; + uint8_t* elem = lpGet(lp_it, &ele_len, int_buf); + DCHECK(elem); + return std::string_view{reinterpret_cast(elem), size_t(ele_len)}; +} + } // namespace dfly::container_utils diff --git a/src/server/container_utils.h b/src/server/container_utils.h index 11bf5aaeb..c6ad8a0a7 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -3,11 +3,12 @@ // #pragma once +#include "base/logging.h" #include "core/compact_object.h" -#include "core/string_set.h" #include "server/table.h" extern "C" { +#include "redis/listpack.h" #include "redis/object.h" #include "redis/quicklist.h" } @@ -16,6 +17,8 @@ extern "C" { namespace dfly { +class StringMap; + namespace container_utils { // IsContainer returns true if the iterator points to a container type. @@ -69,6 +72,15 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func); bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0, int32_t end = -1, bool reverse = false, bool use_score = false); +// Get StringMap pointer from primetable value. Sets expire time from db_context +StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context); + +// Get string_view from listpack poiner. Intbuf to store integer values as strings. +std::string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]); + +// Find value by key and return stringview to it, otherwise nullopt. +std::optional LpFind(uint8_t* lp, std::string_view key, uint8_t int_buf[]); + }; // namespace container_utils } // namespace dfly diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 5c26d2d92..02370d025 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -16,6 +16,7 @@ extern "C" { #include "facade/error.h" #include "server/command_registry.h" #include "server/conn_context.h" +#include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/transaction.h" @@ -43,30 +44,9 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) { return lpBytes(const_cast(lp)) + sum < kMaxListPackLen; } -inline StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context) { - StringMap* res = (StringMap*)pv.RObjPtr(); - uint32_t map_time = MemberTimeSeconds(db_context.time_now_ms); - res->set_time(map_time); - return res; -} - -inline string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) { - int64_t ele_len = 0; - uint8_t* elem = lpGet(lp_it, &ele_len, int_buf); - DCHECK(elem); - return string_view{reinterpret_cast(elem), size_t(ele_len)}; -} - -optional LpFind(uint8_t* lp, string_view key, uint8_t int_buf[]) { - uint8_t* fptr = lpFirst(lp); - DCHECK(fptr); - - fptr = lpFind(lp, fptr, (unsigned char*)key.data(), key.size(), 1); - if (!fptr) - return nullopt; - uint8_t* vptr = lpNext(lp, fptr); - return LpGetView(vptr, int_buf); -} +using container_utils::GetStringMap; +using container_utils::LpFind; +using container_utils::LpGetView; pair LpDelete(uint8_t* lp, string_view field) { uint8_t* fptr = lpFirst(lp); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 702f927d2..02d6e2682 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -32,6 +32,7 @@ extern "C" { #include "server/list_family.h" #include "server/multi_command_squasher.h" #include "server/script_mgr.h" +#include "server/search_family.h" #include "server/server_state.h" #include "server/set_family.h" #include "server/stream_family.h" @@ -1772,6 +1773,7 @@ void Service::RegisterCommands() { JsonFamily::Register(®istry_); BitOpsFamily::Register(®istry_); HllFamily::Register(®istry_); + SearchFamily::Register(®istry_); server_family_.Register(®istry_); diff --git a/src/server/search_family.cc b/src/server/search_family.cc new file mode 100644 index 000000000..e26786173 --- /dev/null +++ b/src/server/search_family.cc @@ -0,0 +1,243 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/search_family.h" + +#include +#include + +#include "base/logging.h" +#include "core/search/search.h" +#include "facade/reply_builder.h" +#include "server/command_registry.h" +#include "server/conn_context.h" +#include "server/container_utils.h" +#include "server/engine_shard_set.h" +#include "server/transaction.h" + +extern "C" { +#include "redis/listpack.h" +#include "redis/object.h" +}; + +namespace dfly { + +using namespace std; +using namespace facade; + +namespace { + +string_view SdsToSafeSv(sds str) { + return str != nullptr ? string_view{str, sdslen(str)} : ""sv; +} + +using DocumentData = absl::flat_hash_map; +using SerializedDocument = pair; +using Query = search::AstExpr; + +struct BaseAccessor : public search::HSetAccessor { + using FieldConsumer = search::HSetAccessor::FieldConsumer; + + virtual DocumentData Serialize() const = 0; +}; + +struct ListPackAccessor : public BaseAccessor { + using LpPtr = uint8_t*; + + ListPackAccessor(LpPtr ptr) : lp_{ptr} { + } + + bool Check(FieldConsumer f, string_view active_field) const override { + std::array intbuf; + + if (!active_field.empty()) { + return f(container_utils::LpFind(lp_, active_field, intbuf.data()).value_or("")); + } + + uint8_t* fptr = lpFirst(lp_); + DCHECK_NE(fptr, nullptr); + + while (fptr) { + fptr = lpNext(lp_, fptr); // skip key + string_view v = container_utils::LpGetView(fptr, intbuf.data()); + fptr = lpNext(lp_, fptr); + + if (f(v)) + return true; + } + + return false; + } + + DocumentData Serialize() const override { + std::array intbuf[2]; + DocumentData out{}; + + uint8_t* fptr = lpFirst(lp_); + DCHECK_NE(fptr, nullptr); + + while (fptr) { + string_view k = container_utils::LpGetView(fptr, intbuf[0].data()); + fptr = lpNext(lp_, fptr); // skip key + string_view v = container_utils::LpGetView(fptr, intbuf[1].data()); + fptr = lpNext(lp_, fptr); + + out[k] = v; + } + + return out; + } + + private: + LpPtr lp_; +}; + +struct StringMapAccessor : public BaseAccessor { + StringMapAccessor(StringMap* hset) : hset_{hset} { + } + + bool Check(FieldConsumer f, string_view active_field) const override { + if (!active_field.empty()) { + return f(SdsToSafeSv(hset_->Find(active_field))); + } + + for (const auto& [k, v] : *hset_) { + if (f(SdsToSafeSv(v))) + return true; + } + + return false; + } + + DocumentData Serialize() const override { + DocumentData out{}; + for (const auto& [k, v] : *hset_) + out[SdsToSafeSv(k)] = SdsToSafeSv(v); + return out; + } + + private: + StringMap* hset_; +}; + +unique_ptr GetAccessor(const OpArgs& op_args, const PrimeValue& pv) { + if (pv.Encoding() == kEncodingListPack) { + auto ptr = reinterpret_cast(pv.RObjPtr()); + return make_unique(ptr); + } else { + auto* sm = container_utils::GetStringMap(pv, op_args.db_cntx); + return make_unique(sm); + } +} + +// Perform brute force search for all hashes in shard with specific prefix +// that match the query +void OpSearch(const OpArgs& op_args, string_view prefix, const Query& query, + vector* shard_out) { + auto& db_slice = op_args.shard->db_slice(); + DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); + auto [prime_table, _] = db_slice.GetTables(op_args.db_cntx.db_index); + + string scratch; + auto cb = [&](PrimeTable::iterator it) { + // Check entry is hash + const PrimeValue& pv = it->second; + if (pv.ObjType() != OBJ_HASH) + return; + + // Check key starts with prefix + string_view key = it->first.GetSlice(&scratch); + if (key.rfind(prefix, 0) != 0) + return; + + // Check entry matches filter + auto accessor = GetAccessor(op_args, pv); + if (query->Check(search::SearchInput{accessor.get()})) + shard_out->emplace_back(key, accessor->Serialize()); + }; + + PrimeTable::Cursor cursor; + do { + cursor = prime_table->Traverse(cursor, cb); + } while (cursor); +} + +} // namespace + +void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { + string_view idx = ArgS(args, 0); + string prefix; + + if (args.size() > 1 && ArgS(args, 1) == "ON") { + if (ArgS(args, 2) != "HASH" || ArgS(args, 3) != "PREFIX" || ArgS(args, 4) != "1") { + (*cntx)->SendError("Only simplest config supported"); + return; + } + prefix = ArgS(args, 5); + } + { + lock_guard lk{indices_mu_}; + indices_[idx] = prefix; + } + (*cntx)->SendOk(); +} + +void SearchFamily::FtSearch(CmdArgList args, ConnectionContext* cntx) { + string_view index = ArgS(args, 0); + string_view query_str = ArgS(args, 1); + + string prefix; + { + lock_guard lk{indices_mu_}; + auto it = indices_.find(index); + if (it == indices_.end()) { + (*cntx)->SendError(string{index} + ": no such index"); + return; + } + prefix = it->second; + } + + Query query = search::ParseQuery(query_str); + if (!query) { + (*cntx)->SendError("Syntax error"); + return; + } + + vector> docs(shard_set->size()); + cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) { + OpSearch(t->GetOpArgs(shard), prefix, query, &docs[shard->shard_id()]); + return OpStatus::OK; + }); + + size_t total_count = 0; + for (const auto& shard_docs : docs) + total_count += shard_docs.size(); + + (*cntx)->StartArray(total_count * 2 + 1); + (*cntx)->SendLong(total_count); + for (const auto& shard_docs : docs) { + for (const auto& [key, doc] : shard_docs) { + (*cntx)->SendBulkString(key); + (*cntx)->StartCollection(doc.size(), RedisReplyBuilder::MAP); + for (const auto& [k, v] : doc) { + (*cntx)->SendBulkString(k); + (*cntx)->SendBulkString(v); + } + } + } +} + +#define HFUNC(x) SetHandler(&SearchFamily::x) + +void SearchFamily::Register(CommandRegistry* registry) { + using CI = CommandId; + + *registry << CI{"FT.CREATE", CO::FAST, -2, 0, 0, 0}.HFUNC(FtCreate) + << CI{"FT.SEARCH", CO::GLOBAL_TRANS, -3, 0, 0, 0}.HFUNC(FtSearch); +} + +Mutex SearchFamily::indices_mu_{}; +absl::flat_hash_map SearchFamily::indices_{}; + +} // namespace dfly diff --git a/src/server/search_family.h b/src/server/search_family.h new file mode 100644 index 000000000..0de3635bf --- /dev/null +++ b/src/server/search_family.h @@ -0,0 +1,30 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include + +#include "base/mutex.h" +#include "server/common.h" + +namespace dfly { +class CommandRegistry; +class ConnectionContext; + +class SearchFamily { + static void FtCreate(CmdArgList args, ConnectionContext* cntx); + static void FtSearch(CmdArgList args, ConnectionContext* cntx); + + public: + static void Register(CommandRegistry* registry); + + private: + static Mutex indices_mu_; + static absl::flat_hash_map indices_; +}; + +} // namespace dfly diff --git a/src/server/search_family_test.cc b/src/server/search_family_test.cc new file mode 100644 index 000000000..391347308 --- /dev/null +++ b/src/server/search_family_test.cc @@ -0,0 +1,67 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/search_family.h" + +#include "base/gtest.h" +#include "base/logging.h" +#include "facade/facade_test.h" +#include "server/command_registry.h" +#include "server/test_utils.h" + +using namespace testing; +using namespace std; +using namespace util; + +namespace dfly { + +class SearchFamilyTest : public BaseFamilyTest { + protected: +}; + +const auto kNoResults = IntArg(0); // tests auto destruct single element arrays + +TEST_F(SearchFamilyTest, CreateIndex) { + EXPECT_EQ(Run({"ft.create", "idx", "ON", "HASH", "PREFIX", "1", "prefix"}), "OK"); +} + +TEST_F(SearchFamilyTest, Simple) { + EXPECT_EQ(Run({"ft.create", "i1", "ON", "HASH", "PREFIX", "1", "d:"}), "OK"); + Run({"hset", "d:1", "foo", "baz", "k", "v"}); + Run({"hset", "d:2", "foo", "bar", "k", "v"}); + Run({"hset", "d:3", "foo", "bad", "k", "v"}); + + { + auto resp = Run({"ft.search", "i1", "@foo:bar"}); + EXPECT_THAT(resp, ArrLen(1 + 2)); // single key-data pair of d:2 + + auto doc = resp.GetVec(); + EXPECT_THAT(doc[0], IntArg(1)); + EXPECT_EQ(doc[1], "d:2"); + EXPECT_THAT(doc[2], ArrLen(4)); // foo and k pairs + } + + EXPECT_THAT(Run({"ft.search", "i1", "@foo:bar | @foo:baz"}), ArrLen(1 + 2 * 2)); + EXPECT_THAT(Run({"ft.search", "i1", "@foo:(bar|baz|bad)"}), ArrLen(1 + 3 * 2)); + + EXPECT_THAT(Run({"ft.search", "i1", "@foo:none"}), kNoResults); + + EXPECT_THAT(Run({"ft.search", "iNone", "@foo:bar"}), ErrArg("iNone: no such index")); + EXPECT_THAT(Run({"ft.search", "i1", "@@NOTAQUERY@@"}), ErrArg("Syntax error")); + + // w: prefix is not part of index + Run({"hset", "w:2", "foo", "this", "k", "v"}); + EXPECT_THAT(Run({"ft.search", "i1", "@foo:this"}), kNoResults); +} + +TEST_F(SearchFamilyTest, NoPrefix) { + EXPECT_EQ(Run({"ft.create", "i1"}), "OK"); + Run({"hset", "d:1", "a", "one", "k", "v"}); + Run({"hset", "d:2", "b", "two", "k", "v"}); + Run({"hset", "d:3", "c", "three", "k", "v"}); + + EXPECT_THAT(Run({"ft.search", "i1", "one | three"}), ArrLen(1 + 2 * 2)); +} + +} // namespace dfly