diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 1b2cec4f2..9d9de40be 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -22,7 +22,8 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc - top_keys.cc multi_command_squasher.cc hll_family.cc) + top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc) + cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4) @@ -50,9 +51,12 @@ cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) cxx_test(top_keys_test dfly_test_lib LABELS DFLY) cxx_test(hll_family_test dfly_test_lib LABELS DFLY) cxx_test(search_family_test dfly_test_lib LABELS DFLY) +cxx_test(cluster_config_test dfly_test_lib LABELS DFLY) + + add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_dependencies(check_dfly dragonfly_test json_family_test list_family_test generic_family_test memcache_parser_test rdb_test journal_test redis_parser_test snapshot_test stream_family_test string_family_test - bitops_family_test set_family_test zset_family_test hll_family_test) + bitops_family_test set_family_test zset_family_test hll_family_test cluster_config_test) diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc new file mode 100644 index 000000000..f480a2577 --- /dev/null +++ b/src/server/cluster/cluster_config.cc @@ -0,0 +1,53 @@ +extern "C" { +#include "redis/crc16.h" +} + +#include +#include + +#include "cluster_config.h" + +namespace dfly { + +bool ClusterConfig::cluster_enabled = false; + +static constexpr SlotId kMaxSlotNum = 0x3FFF; + +std::string_view ClusterConfig::KeyTag(std::string_view key) { + size_t start = key.find('{'); + if (start == key.npos) { + return key; + } + size_t end = key.find('}', start + 1); + if (end == key.npos || end == start + 1) { + return key; + } + return key.substr(start + 1, end - start - 1); +} + +SlotId ClusterConfig::KeySlot(std::string_view key) { + std::string_view tag = KeyTag(key); + return crc16(tag.data(), tag.length()) & kMaxSlotNum; +} + +ClusterConfig::ClusterConfig() { + cluster_enabled = true; + AddSlots(); +} + +void ClusterConfig::AddSlots() { + // TODO update logic acording to config + // currently add all slots to owned slots + std::lock_guard lk{slots_mu_}; + for (SlotId slot_id = 0; slot_id <= kMaxSlotNum; ++slot_id) { + owned_slots_.emplace(slot_id); + } + return; +} + +bool ClusterConfig::IsMySlot(SlotId id) { + std::shared_lock sl(slots_mu_); + return owned_slots_.contains(id); +} + +} // namespace dfly diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h new file mode 100644 index 000000000..73b514ade --- /dev/null +++ b/src/server/cluster/cluster_config.h @@ -0,0 +1,38 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once +#include + +#include +#include + +#include "src/core/fibers.h" + +namespace dfly { + +typedef uint16_t SlotId; + +class ClusterConfig { + public: + ClusterConfig(); + static SlotId KeySlot(std::string_view key); + static bool IsClusterEnabled() { + return cluster_enabled; + } + // If the key contains the {...} pattern, return only the part between { and } + static std::string_view KeyTag(std::string_view key); + + // If key is in my slots ownership return true + bool IsMySlot(SlotId id); + + private: + void AddSlots(); + + util::SharedMutex slots_mu_; + absl::flat_hash_set owned_slots_; + static bool cluster_enabled; +}; + +} // namespace dfly diff --git a/src/server/cluster_config_test.cc b/src/server/cluster_config_test.cc new file mode 100644 index 000000000..3884c8b2f --- /dev/null +++ b/src/server/cluster_config_test.cc @@ -0,0 +1,30 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/cluster/cluster_config.h" + +#include "base/gtest.h" + +namespace dfly { + +class ClusterConfigTest : public ::testing::Test {}; + +TEST_F(ClusterConfigTest, KeyTagTest) { + std::string key = "{user1000}.following"; + ASSERT_EQ("user1000", ClusterConfig::KeyTag(key)); + + key = " foo{}{bar}"; + ASSERT_EQ(key, ClusterConfig::KeyTag(key)); + + key = "foo{{bar}}zap"; + ASSERT_EQ("{bar", ClusterConfig::KeyTag(key)); + + key = "foo{bar}{zap}"; + ASSERT_EQ("bar", ClusterConfig::KeyTag(key)); + + key = "{}foo{bar}{zap}"; + ASSERT_EQ(key, ClusterConfig::KeyTag(key)); +} + +} // namespace dfly diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index e1ca87bb6..77ed8c62d 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -21,6 +21,7 @@ extern "C" { #include "core/fibers.h" #include "core/mi_memory_resource.h" #include "core/tx_queue.h" +#include "server/cluster/cluster_config.h" #include "server/db_slice.h" namespace dfly { @@ -332,6 +333,9 @@ template void EngineShardSet::RunBlockingInParallel(U&& func) { } inline ShardId Shard(std::string_view v, ShardId shard_num) { + if (ClusterConfig::IsClusterEnabled()) { + v = ClusterConfig::KeyTag(v); + } XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL); return hash % shard_num; } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 9f96697e3..f995e900b 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -589,6 +589,47 @@ void Service::Shutdown() { ThisFiber::SleepFor(10ms); } +bool Service::CheckKeysOwnership(const CommandId* cid, CmdArgList args, + ConnectionContext* dfly_cntx) { + if (cid->first_key_pos() == 0) { + return true; // No key command. + } + OpResult key_index_res = DetermineKeys(cid, args); + if (!key_index_res) { + (*dfly_cntx)->SendError(key_index_res.status()); + return false; + } + + const auto& key_index = *key_index_res; + SlotId keys_slot; + bool cross_slot = false; + // Iterate keys and check to which slot they belong. + for (unsigned i = key_index.start; i < key_index.end; ++i) { + string_view key = ArgS(args, i); + SlotId slot = ClusterConfig::KeySlot(key); + if (i == key_index.start) { + keys_slot = slot; + continue; + } + if (slot != keys_slot) { + // keys belong to diffent slots + cross_slot = true; + break; + } + } + if (cross_slot) { + (*dfly_cntx)->SendError("-CROSSSLOT Keys in request don't hash to the same slot"); + return false; + } + // Check keys slot is in my ownership + if (!server_family_.cluster_config()->IsMySlot(keys_slot)) { + (*dfly_cntx)->SendError("MOVED"); // TODO add more info to moved error. + return false; + } + + return true; +} + // Return OK if all keys are allowed to be accessed: either declared in EVAL or // transaction is running in global or non-atomic mode. OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const CommandId* cid, @@ -706,6 +747,10 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionCon } } + if (ClusterConfig::IsClusterEnabled() && !CheckKeysOwnership(cid, args.subspan(1), dfly_cntx)) { + return false; + } + if (under_script && cid->IsTransactional()) { OpStatus status = CheckKeysDeclared(*dfly_cntx->conn_state.script_info, cid, args.subspan(1), dfly_cntx->transaction); diff --git a/src/server/main_service.h b/src/server/main_service.h index e03ea23ba..e3da5c043 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -118,6 +118,10 @@ class Service : public facade::ServiceInterface { // Return false if command is invalid and reply with error. bool VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionContext* cntx); + // Return false if not all keys are owned by the server when running in cluster mode. + // If false is returned error was sent to the client. + bool CheckKeysOwnership(const CommandId* cid, CmdArgList args, ConnectionContext* dfly_cntx); + const CommandId* FindCmd(CmdArgList args) const; void EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index ae338b5f7..2b7c5f5c6 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -60,7 +60,7 @@ ABSL_FLAG(string, save_schedule, "", ABSL_FLAG(bool, df_snapshot_format, true, "if true, save in dragonfly-specific snapshotting format"); ABSL_FLAG(string, cluster_mode, "", - "Cluster mode supported. Currently supports only `emulated`. " + "Cluster mode supported." "default: \"\""); ABSL_FLAG(string, cluster_announce_ip, "", "ip that cluster commands announce to the client"); @@ -395,11 +395,12 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { } string cluster_mode = GetFlag(FLAGS_cluster_mode); - if (cluster_mode.empty()) { - is_emulated_cluster_ = false; - } else if (cluster_mode == "emulated") { + + if (cluster_mode == "emulated") { is_emulated_cluster_ = true; - } else { + } else if (cluster_mode == "yes") { + cluster_config_.reset(new ClusterConfig()); + } else if (!cluster_mode.empty()) { LOG(ERROR) << "invalid cluster_mode. Exiting..."; exit(1); } @@ -627,7 +628,8 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) { continue; } - // if it matches check the last save time, if it is the same minute don't save another snapshot + // if it matches check the last save time, if it is the same minute don't save another + // snapshot time_t last_save; { lock_guard lk(save_mu_); @@ -887,7 +889,8 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* } // Run callback for all active RdbSnapshots (passed as index). -// .dfs format contains always `shard_set->size() + 1` snapshots (for every shard and summary file). +// .dfs format contains always `shard_set->size() + 1` snapshots (for every shard and summary +// file). static void RunStage(bool new_version, std::function cb) { if (new_version) { shard_set->RunBlockingInParallel([&](EngineShard* es) { cb(es->shard_id()); }); @@ -1231,10 +1234,10 @@ void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { // This command supports 2 sub options: // 1. HELP // 2. SLOTS: the slots are a mapping between sharding and hosts in the cluster. - // Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across multiple - // hosts), as a results all shards are map to the same host (i.e. range is between and kEndSlot) - // and number of cluster sharding is thus == 1 (kClustersShardingCount). - // For more details https://redis.io/commands/cluster-slots/ + // Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across + // multiple hosts), as a results all shards are map to the same host (i.e. range is between and + // kEndSlot) and number of cluster sharding is thus == 1 (kClustersShardingCount). For more + // details https://redis.io/commands/cluster-slots/ constexpr unsigned int kEndSlot = 16383; // see redis code (cluster.c CLUSTER_SLOTS). constexpr unsigned int kStartSlot = 0; constexpr unsigned int kClustersShardingCount = 1; @@ -1244,8 +1247,9 @@ void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[0]); string_view sub_cmd = ArgS(args, 0); - if (!is_emulated_cluster_) { - return (*cntx)->SendError("CLUSTER commands requires --cluster_mode=emulated"); + if (!is_emulated_cluster_ && !ClusterConfig::IsClusterEnabled()) { + return (*cntx)->SendError( + "CLUSTER commands requires --cluster_mode=emulated or --cluster_mode=yes"); } if (sub_cmd == "HELP") { @@ -1749,7 +1753,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { if (should_enter("CLUSTER")) { ADD_HEADER("# Cluster"); - append("cluster_enabled", is_emulated_cluster_); + append("cluster_enabled", is_emulated_cluster_ || ClusterConfig::IsClusterEnabled()); } (*cntx)->SendBulkString(info); diff --git a/src/server/server_family.h b/src/server/server_family.h index 431b91ccd..29597e69b 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -9,6 +9,7 @@ #include "facade/conn_context.h" #include "facade/redis_parser.h" #include "server/channel_store.h" +#include "server/cluster/cluster_config.h" #include "server/engine_shard_set.h" namespace util { @@ -86,6 +87,10 @@ class ServerFamily { return script_mgr_.get(); } + ClusterConfig* cluster_config() { + return cluster_config_.get(); + } + void StatsMC(std::string_view section, facade::ConnectionContext* cntx); // if new_version is true, saves DF specific, non redis compatible snapshot. @@ -187,6 +192,8 @@ class ServerFamily { time_t start_time_ = 0; // in seconds, epoch time. bool is_emulated_cluster_ = false; + std::unique_ptr cluster_config_; + std::shared_ptr last_save_info_; // protected by save_mu_; std::atomic_bool is_saving_{false};