1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

feat(cluster): check command keys ownership (#1194)

* feat(cluster): check command keys ownership on cluster mode

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-05-10 12:06:52 +03:00 committed by GitHub
parent 36cd15a196
commit 577472eb22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 205 additions and 16 deletions

View file

@ -22,7 +22,8 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
top_keys.cc multi_command_squasher.cc hll_family.cc)
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib
absl::random_random TRDP::jsoncons zstd TRDP::lz4)
@ -50,9 +51,12 @@ cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(search_family_test dfly_test_lib LABELS DFLY)
cxx_test(cluster_config_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
generic_family_test memcache_parser_test rdb_test journal_test
redis_parser_test snapshot_test stream_family_test string_family_test
bitops_family_test set_family_test zset_family_test hll_family_test)
bitops_family_test set_family_test zset_family_test hll_family_test cluster_config_test)

View file

@ -0,0 +1,53 @@
extern "C" {
#include "redis/crc16.h"
}
#include <shared_mutex>
#include <string_view>
#include "cluster_config.h"
namespace dfly {
bool ClusterConfig::cluster_enabled = false;
static constexpr SlotId kMaxSlotNum = 0x3FFF;
std::string_view ClusterConfig::KeyTag(std::string_view key) {
size_t start = key.find('{');
if (start == key.npos) {
return key;
}
size_t end = key.find('}', start + 1);
if (end == key.npos || end == start + 1) {
return key;
}
return key.substr(start + 1, end - start - 1);
}
SlotId ClusterConfig::KeySlot(std::string_view key) {
std::string_view tag = KeyTag(key);
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}
ClusterConfig::ClusterConfig() {
cluster_enabled = true;
AddSlots();
}
void ClusterConfig::AddSlots() {
// TODO update logic acording to config
// currently add all slots to owned slots
std::lock_guard lk{slots_mu_};
for (SlotId slot_id = 0; slot_id <= kMaxSlotNum; ++slot_id) {
owned_slots_.emplace(slot_id);
}
return;
}
bool ClusterConfig::IsMySlot(SlotId id) {
std::shared_lock sl(slots_mu_);
return owned_slots_.contains(id);
}
} // namespace dfly

View file

@ -0,0 +1,38 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/flat_hash_set.h>
#include <string_view>
#include <tuple>
#include "src/core/fibers.h"
namespace dfly {
typedef uint16_t SlotId;
class ClusterConfig {
public:
ClusterConfig();
static SlotId KeySlot(std::string_view key);
static bool IsClusterEnabled() {
return cluster_enabled;
}
// If the key contains the {...} pattern, return only the part between { and }
static std::string_view KeyTag(std::string_view key);
// If key is in my slots ownership return true
bool IsMySlot(SlotId id);
private:
void AddSlots();
util::SharedMutex slots_mu_;
absl::flat_hash_set<SlotId> owned_slots_;
static bool cluster_enabled;
};
} // namespace dfly

View file

@ -0,0 +1,30 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/cluster/cluster_config.h"
#include "base/gtest.h"
namespace dfly {
class ClusterConfigTest : public ::testing::Test {};
TEST_F(ClusterConfigTest, KeyTagTest) {
std::string key = "{user1000}.following";
ASSERT_EQ("user1000", ClusterConfig::KeyTag(key));
key = " foo{}{bar}";
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
key = "foo{{bar}}zap";
ASSERT_EQ("{bar", ClusterConfig::KeyTag(key));
key = "foo{bar}{zap}";
ASSERT_EQ("bar", ClusterConfig::KeyTag(key));
key = "{}foo{bar}{zap}";
ASSERT_EQ(key, ClusterConfig::KeyTag(key));
}
} // namespace dfly

View file

@ -21,6 +21,7 @@ extern "C" {
#include "core/fibers.h"
#include "core/mi_memory_resource.h"
#include "core/tx_queue.h"
#include "server/cluster/cluster_config.h"
#include "server/db_slice.h"
namespace dfly {
@ -332,6 +333,9 @@ template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
}
inline ShardId Shard(std::string_view v, ShardId shard_num) {
if (ClusterConfig::IsClusterEnabled()) {
v = ClusterConfig::KeyTag(v);
}
XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577240643ULL);
return hash % shard_num;
}

View file

@ -589,6 +589,47 @@ void Service::Shutdown() {
ThisFiber::SleepFor(10ms);
}
bool Service::CheckKeysOwnership(const CommandId* cid, CmdArgList args,
ConnectionContext* dfly_cntx) {
if (cid->first_key_pos() == 0) {
return true; // No key command.
}
OpResult<KeyIndex> key_index_res = DetermineKeys(cid, args);
if (!key_index_res) {
(*dfly_cntx)->SendError(key_index_res.status());
return false;
}
const auto& key_index = *key_index_res;
SlotId keys_slot;
bool cross_slot = false;
// Iterate keys and check to which slot they belong.
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = ArgS(args, i);
SlotId slot = ClusterConfig::KeySlot(key);
if (i == key_index.start) {
keys_slot = slot;
continue;
}
if (slot != keys_slot) {
// keys belong to diffent slots
cross_slot = true;
break;
}
}
if (cross_slot) {
(*dfly_cntx)->SendError("-CROSSSLOT Keys in request don't hash to the same slot");
return false;
}
// Check keys slot is in my ownership
if (!server_family_.cluster_config()->IsMySlot(keys_slot)) {
(*dfly_cntx)->SendError("MOVED"); // TODO add more info to moved error.
return false;
}
return true;
}
// Return OK if all keys are allowed to be accessed: either declared in EVAL or
// transaction is running in global or non-atomic mode.
OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const CommandId* cid,
@ -706,6 +747,10 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionCon
}
}
if (ClusterConfig::IsClusterEnabled() && !CheckKeysOwnership(cid, args.subspan(1), dfly_cntx)) {
return false;
}
if (under_script && cid->IsTransactional()) {
OpStatus status = CheckKeysDeclared(*dfly_cntx->conn_state.script_info, cid, args.subspan(1),
dfly_cntx->transaction);

View file

@ -118,6 +118,10 @@ class Service : public facade::ServiceInterface {
// Return false if command is invalid and reply with error.
bool VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionContext* cntx);
// Return false if not all keys are owned by the server when running in cluster mode.
// If false is returned error was sent to the client.
bool CheckKeysOwnership(const CommandId* cid, CmdArgList args, ConnectionContext* dfly_cntx);
const CommandId* FindCmd(CmdArgList args) const;
void EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx);

View file

@ -60,7 +60,7 @@ ABSL_FLAG(string, save_schedule, "",
ABSL_FLAG(bool, df_snapshot_format, true,
"if true, save in dragonfly-specific snapshotting format");
ABSL_FLAG(string, cluster_mode, "",
"Cluster mode supported. Currently supports only `emulated`. "
"Cluster mode supported."
"default: \"\"");
ABSL_FLAG(string, cluster_announce_ip, "", "ip that cluster commands announce to the client");
@ -395,11 +395,12 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
}
string cluster_mode = GetFlag(FLAGS_cluster_mode);
if (cluster_mode.empty()) {
is_emulated_cluster_ = false;
} else if (cluster_mode == "emulated") {
if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else {
} else if (cluster_mode == "yes") {
cluster_config_.reset(new ClusterConfig());
} else if (!cluster_mode.empty()) {
LOG(ERROR) << "invalid cluster_mode. Exiting...";
exit(1);
}
@ -627,7 +628,8 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) {
continue;
}
// if it matches check the last save time, if it is the same minute don't save another snapshot
// if it matches check the last save time, if it is the same minute don't save another
// snapshot
time_t last_save;
{
lock_guard lk(save_mu_);
@ -887,7 +889,8 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
}
// Run callback for all active RdbSnapshots (passed as index).
// .dfs format contains always `shard_set->size() + 1` snapshots (for every shard and summary file).
// .dfs format contains always `shard_set->size() + 1` snapshots (for every shard and summary
// file).
static void RunStage(bool new_version, std::function<void(unsigned)> cb) {
if (new_version) {
shard_set->RunBlockingInParallel([&](EngineShard* es) { cb(es->shard_id()); });
@ -1231,10 +1234,10 @@ void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
// This command supports 2 sub options:
// 1. HELP
// 2. SLOTS: the slots are a mapping between sharding and hosts in the cluster.
// Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across multiple
// hosts), as a results all shards are map to the same host (i.e. range is between and kEndSlot)
// and number of cluster sharding is thus == 1 (kClustersShardingCount).
// For more details https://redis.io/commands/cluster-slots/
// Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across
// multiple hosts), as a results all shards are map to the same host (i.e. range is between and
// kEndSlot) and number of cluster sharding is thus == 1 (kClustersShardingCount). For more
// details https://redis.io/commands/cluster-slots/
constexpr unsigned int kEndSlot = 16383; // see redis code (cluster.c CLUSTER_SLOTS).
constexpr unsigned int kStartSlot = 0;
constexpr unsigned int kClustersShardingCount = 1;
@ -1244,8 +1247,9 @@ void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
if (!is_emulated_cluster_) {
return (*cntx)->SendError("CLUSTER commands requires --cluster_mode=emulated");
if (!is_emulated_cluster_ && !ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError(
"CLUSTER commands requires --cluster_mode=emulated or --cluster_mode=yes");
}
if (sub_cmd == "HELP") {
@ -1749,7 +1753,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (should_enter("CLUSTER")) {
ADD_HEADER("# Cluster");
append("cluster_enabled", is_emulated_cluster_);
append("cluster_enabled", is_emulated_cluster_ || ClusterConfig::IsClusterEnabled());
}
(*cntx)->SendBulkString(info);

View file

@ -9,6 +9,7 @@
#include "facade/conn_context.h"
#include "facade/redis_parser.h"
#include "server/channel_store.h"
#include "server/cluster/cluster_config.h"
#include "server/engine_shard_set.h"
namespace util {
@ -86,6 +87,10 @@ class ServerFamily {
return script_mgr_.get();
}
ClusterConfig* cluster_config() {
return cluster_config_.get();
}
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);
// if new_version is true, saves DF specific, non redis compatible snapshot.
@ -187,6 +192,8 @@ class ServerFamily {
time_t start_time_ = 0; // in seconds, epoch time.
bool is_emulated_cluster_ = false;
std::unique_ptr<ClusterConfig> cluster_config_;
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_;
std::atomic_bool is_saving_{false};