mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-15 17:51:06 +00:00
Implement MATCH and TYPE options for the SCAN command
This commit is contained in:
parent
b9c1288c67
commit
37f09f315e
13 changed files with 119 additions and 85 deletions
|
@ -170,7 +170,7 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
|
|||
}
|
||||
|
||||
auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const
|
||||
-> OpResult<MainIterator> {
|
||||
-> OpResult<PrimeIterator> {
|
||||
auto it = FindExt(db_index, key).first;
|
||||
|
||||
if (!IsValid(it))
|
||||
|
@ -183,11 +183,11 @@ auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) con
|
|||
return it;
|
||||
}
|
||||
|
||||
pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view key) const {
|
||||
pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view key) const {
|
||||
DCHECK(IsDbValid(db_ind));
|
||||
|
||||
auto& db = db_arr_[db_ind];
|
||||
MainIterator it = db->prime_table.Find(key);
|
||||
PrimeIterator it = db->prime_table.Find(key);
|
||||
|
||||
if (!IsValid(it)) {
|
||||
return make_pair(it, ExpireIterator{});
|
||||
|
@ -200,12 +200,12 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
|
|||
return make_pair(it, ExpireIterator{});
|
||||
}
|
||||
|
||||
OpResult<pair<MainIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) {
|
||||
OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, const ArgSlice& args) {
|
||||
DCHECK(!args.empty());
|
||||
|
||||
for (unsigned i = 0; i < args.size(); ++i) {
|
||||
string_view s = args[i];
|
||||
OpResult<MainIterator> res = Find(db_index, s, OBJ_LIST);
|
||||
OpResult<PrimeIterator> res = Find(db_index, s, OBJ_LIST);
|
||||
if (res)
|
||||
return make_pair(res.value(), i);
|
||||
if (res.status() != OpStatus::KEY_NOTFOUND)
|
||||
|
@ -216,7 +216,7 @@ OpResult<pair<MainIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, cons
|
|||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator, bool> {
|
||||
auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator, bool> {
|
||||
DCHECK(IsDbValid(db_index));
|
||||
|
||||
auto& db = db_arr_[db_index];
|
||||
|
@ -293,7 +293,7 @@ void DbSlice::CreateDb(DbIndex index) {
|
|||
}
|
||||
}
|
||||
|
||||
bool DbSlice::Del(DbIndex db_ind, MainIterator it) {
|
||||
bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
|
||||
if (!IsValid(it)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -345,7 +345,7 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
|
|||
}
|
||||
|
||||
// Returns true if a state has changed, false otherwise.
|
||||
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
|
||||
bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) {
|
||||
auto& db = *db_arr_[db_ind];
|
||||
if (at == 0 && it->second.HasExpire()) {
|
||||
CHECK_EQ(1u, db.expire_table.Erase(it->first));
|
||||
|
@ -383,7 +383,7 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {
|
|||
return it.is_done() ? 0 : it->second;
|
||||
}
|
||||
|
||||
MainIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
|
||||
PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) {
|
||||
for (const auto& ccb : change_cb_) {
|
||||
ccb.second(db_ind, ChangeReq{key});
|
||||
|
@ -395,7 +395,7 @@ MainIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj,
|
|||
return res;
|
||||
}
|
||||
|
||||
pair<MainIterator, bool> DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj,
|
||||
pair<PrimeIterator, bool> DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) {
|
||||
DCHECK(!obj.IsRef());
|
||||
|
||||
|
@ -510,7 +510,7 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) con
|
|||
return true;
|
||||
}
|
||||
|
||||
void DbSlice::PreUpdate(DbIndex db_ind, MainIterator it) {
|
||||
void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
|
||||
auto& db = db_arr_[db_ind];
|
||||
for (const auto& ccb : change_cb_) {
|
||||
ccb.second(db_ind, ChangeReq{it});
|
||||
|
@ -519,12 +519,12 @@ void DbSlice::PreUpdate(DbIndex db_ind, MainIterator it) {
|
|||
it.SetVersion(NextVersion());
|
||||
}
|
||||
|
||||
void DbSlice::PostUpdate(DbIndex db_ind, MainIterator it) {
|
||||
void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it) {
|
||||
auto& db = db_arr_[db_ind];
|
||||
db->stats.obj_memory_usage += it->second.MallocUsed();
|
||||
}
|
||||
|
||||
pair<MainIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, MainIterator it) const {
|
||||
pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, PrimeIterator it) const {
|
||||
DCHECK(it->second.HasExpire());
|
||||
auto& db = db_arr_[db_ind];
|
||||
|
||||
|
@ -545,7 +545,7 @@ pair<MainIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, MainI
|
|||
db->prime_table.Erase(it);
|
||||
++events_.expired_keys;
|
||||
|
||||
return make_pair(MainIterator{}, ExpireIterator{});
|
||||
return make_pair(PrimeIterator{}, ExpireIterator{});
|
||||
}
|
||||
|
||||
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
||||
|
|
|
@ -110,21 +110,21 @@ class DbSlice {
|
|||
return now_ms_;
|
||||
}
|
||||
|
||||
OpResult<MainIterator> Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const;
|
||||
OpResult<PrimeIterator> Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const;
|
||||
|
||||
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
|
||||
std::pair<MainIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
|
||||
std::pair<PrimeIterator, ExpireIterator> FindExt(DbIndex db_ind, std::string_view key) const;
|
||||
|
||||
// Returns dictEntry, args-index if found, KEY_NOTFOUND otherwise.
|
||||
// If multiple keys are found, returns the first index in the ArgSlice.
|
||||
OpResult<std::pair<MainIterator, unsigned>> FindFirst(DbIndex db_index, const ArgSlice& args);
|
||||
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(DbIndex db_index, const ArgSlice& args);
|
||||
|
||||
// Return .second=true if insertion ocurred, false if we return the existing key.
|
||||
std::pair<MainIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
|
||||
std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key);
|
||||
|
||||
// Either adds or removes (if at == 0) expiry. Returns true if a change was made.
|
||||
// Does not change expiry if at != 0 and expiry already exists.
|
||||
bool Expire(DbIndex db_ind, MainIterator main_it, uint64_t at);
|
||||
bool Expire(DbIndex db_ind, PrimeIterator main_it, uint64_t at);
|
||||
|
||||
void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag);
|
||||
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const;
|
||||
|
@ -132,18 +132,18 @@ class DbSlice {
|
|||
// Adds a new entry. Requires: key does not exist in this slice.
|
||||
// Returns the iterator to the newly added entry.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
MainIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms);
|
||||
PrimeIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms);
|
||||
|
||||
// Adds a new entry if a key does not exists. Returns true if insertion took place,
|
||||
// false otherwise. expire_at_ms equal to 0 - means no expiry.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
std::pair<MainIterator, bool> AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj,
|
||||
std::pair<PrimeIterator, bool> AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms);
|
||||
|
||||
// Creates a database with index `db_ind`. If such database exists does nothing.
|
||||
void ActivateDb(DbIndex db_ind);
|
||||
|
||||
bool Del(DbIndex db_ind, MainIterator it);
|
||||
bool Del(DbIndex db_ind, PrimeIterator it);
|
||||
|
||||
constexpr static DbIndex kDbAll = 0xFFFF;
|
||||
|
||||
|
@ -183,16 +183,16 @@ class DbSlice {
|
|||
size_t DbSize(DbIndex db_ind) const;
|
||||
|
||||
// Callback functions called upon writing to the existing key.
|
||||
void PreUpdate(DbIndex db_ind, MainIterator it);
|
||||
void PostUpdate(DbIndex db_ind, MainIterator it);
|
||||
void PreUpdate(DbIndex db_ind, PrimeIterator it);
|
||||
void PostUpdate(DbIndex db_ind, PrimeIterator it);
|
||||
|
||||
InternalDbStats* MutableStats(DbIndex db_ind) {
|
||||
return &db_arr_[db_ind]->stats;
|
||||
}
|
||||
|
||||
// Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it
|
||||
// from both tables and return MainIterator{}.
|
||||
std::pair<MainIterator, ExpireIterator> ExpireIfNeeded(DbIndex db_ind, MainIterator it) const;
|
||||
// from both tables and return PrimeIterator{}.
|
||||
std::pair<PrimeIterator, ExpireIterator> ExpireIfNeeded(DbIndex db_ind, PrimeIterator it) const;
|
||||
|
||||
// Current version of this slice.
|
||||
// We maintain a shared versioning scheme for all databases in the slice.
|
||||
|
@ -200,10 +200,10 @@ class DbSlice {
|
|||
return version_;
|
||||
}
|
||||
|
||||
// ChangeReq - describes the change to the table. If MainIterator is defined then
|
||||
// ChangeReq - describes the change to the table. If PrimeIterator is defined then
|
||||
// it's an update on the existing entry, otherwise if string_view is defined then
|
||||
// it's a new key that is going to be added to the table.
|
||||
using ChangeReq = std::variant<MainIterator, std::string_view>;
|
||||
using ChangeReq = std::variant<PrimeIterator, std::string_view>;
|
||||
|
||||
using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>;
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@
|
|||
|
||||
extern "C" {
|
||||
#include "redis/object.h"
|
||||
#include "redis/util.h"
|
||||
}
|
||||
|
||||
#include "base/logging.h"
|
||||
|
@ -87,7 +88,7 @@ void Renamer::MoveValues(EngineShard* shard, const ArgSlice& args) {
|
|||
|
||||
// Handle destination
|
||||
string_view dest_key = dest_res_.key;
|
||||
MainIterator dest_it = shard->db_slice().FindExt(db_indx_, dest_key).first;
|
||||
PrimeIterator dest_it = shard->db_slice().FindExt(db_indx_, dest_key).first;
|
||||
if (IsValid(dest_it)) {
|
||||
// we just move the source. We won't be able to do it with heap per shard model.
|
||||
dest_it->second = std::move(src_res_.val);
|
||||
|
@ -419,6 +420,7 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
|
|||
EngineShardSet* ess = cntx->shard_set;
|
||||
unsigned shard_count = ess->size();
|
||||
uint32_t limit = 10;
|
||||
string_view pattern, type_filter;
|
||||
|
||||
// Dash table returns a cursor with its right byte empty. We will use it
|
||||
// for encoding shard index. For now scan has a limitation of 255 shards.
|
||||
|
@ -437,15 +439,20 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
string_view opt = ArgS(args, i);
|
||||
if (opt == "COUNT") {
|
||||
if (!absl::SimpleAtoi(ArgS(args, i+ 1), &limit)) {
|
||||
if (!absl::SimpleAtoi(ArgS(args, i + 1), &limit)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
if (limit == 0)
|
||||
limit = 1;
|
||||
else if (limit > 4096)
|
||||
limit = 4096;
|
||||
} else if (opt == "MATCH" || opt == "TYPE") {
|
||||
return (*cntx)->SendError("Not supported"); // TODO
|
||||
} else if (opt == "MATCH") {
|
||||
pattern = ArgS(args, i + 1);
|
||||
if (pattern == "*")
|
||||
pattern = string_view{};
|
||||
} else if (opt == "TYPE") {
|
||||
ToLower(&args[i + 1]);
|
||||
type_filter = ArgS(args, i + 1);
|
||||
} else {
|
||||
return (*cntx)->SendError(kSyntaxErr);
|
||||
}
|
||||
|
@ -462,7 +469,7 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
|
|||
do {
|
||||
ess->Await(sid, [&] {
|
||||
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
|
||||
OpScan(op_args, limit, &cursor, &keys);
|
||||
OpScan(op_args, pattern, type_filter, limit, &cursor, &keys);
|
||||
});
|
||||
if (cursor == 0) {
|
||||
++sid;
|
||||
|
@ -595,19 +602,12 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
void GenericFamily::OpScan(const OpArgs& op_args, size_t limit, uint64_t* cursor,
|
||||
vector<string>* vec) {
|
||||
void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_view type_filter,
|
||||
size_t limit, uint64_t* cursor, StringVec* vec) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
DCHECK(db_slice.IsDbValid(op_args.db_ind));
|
||||
|
||||
unsigned cnt = 0;
|
||||
auto scan_cb = [&](MainIterator it) {
|
||||
if (it->second.HasExpire()) {
|
||||
it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first;
|
||||
}
|
||||
vec->push_back(it->first.ToString());
|
||||
++cnt;
|
||||
};
|
||||
|
||||
VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has "
|
||||
<< db_slice.DbSize(op_args.db_ind);
|
||||
|
@ -615,13 +615,42 @@ void GenericFamily::OpScan(const OpArgs& op_args, size_t limit, uint64_t* cursor
|
|||
uint64_t cur = *cursor;
|
||||
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
|
||||
do {
|
||||
cur = prime_table->Traverse(cur, scan_cb);
|
||||
cur = prime_table->Traverse(
|
||||
cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, pattern, type_filter, vec); });
|
||||
} while (cur && cnt < limit);
|
||||
|
||||
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur;
|
||||
*cursor = cur;
|
||||
}
|
||||
|
||||
bool GenericFamily::ScanCb(const OpArgs& op_args, PrimeIterator it, string_view pattern,
|
||||
string_view type_filter, StringVec* res) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
if (it->second.HasExpire()) {
|
||||
it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first;
|
||||
}
|
||||
|
||||
if (!IsValid(it))
|
||||
return false;
|
||||
|
||||
bool matches = type_filter.empty() || ObjTypeName(it->second.ObjType()) == type_filter;
|
||||
|
||||
if (!matches)
|
||||
return false;
|
||||
|
||||
if (pattern.empty()) {
|
||||
res->push_back(it->first.ToString());
|
||||
} else {
|
||||
string str = it->first.ToString();
|
||||
if (stringmatchlen(pattern.data(), pattern.size(), str.data(), str.size(), 0) != 1)
|
||||
return false;
|
||||
|
||||
res->push_back(std::move(str));
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&GenericFamily::x)
|
||||
|
|
|
@ -6,6 +6,8 @@
|
|||
|
||||
#include "facade/op_status.h"
|
||||
#include "server/common_types.h"
|
||||
#include "server/table.h"
|
||||
|
||||
|
||||
namespace util {
|
||||
class ProactorPool;
|
||||
|
@ -66,8 +68,11 @@ class GenericFamily {
|
|||
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
|
||||
bool skip_exists);
|
||||
|
||||
static void OpScan(const OpArgs& op_args, size_t limit, uint64_t* cursor,
|
||||
std::vector<std::string>* vec);
|
||||
static void OpScan(const OpArgs& op_args, std::string_view pattern, std::string_view type_filter,
|
||||
size_t limit, uint64_t* cursor, StringVec* vec);
|
||||
|
||||
static bool ScanCb(const OpArgs& op_args, PrimeIterator it, std::string_view pattern,
|
||||
std::string_view type_filter, StringVec* res);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -120,7 +120,7 @@ class BPopper {
|
|||
OpStatus Pop(Transaction* t, EngineShard* shard);
|
||||
|
||||
bool found_ = false;
|
||||
MainIterator find_it_;
|
||||
PrimeIterator find_it_;
|
||||
ShardId find_sid_ = std::numeric_limits<ShardId>::max();
|
||||
|
||||
string key_;
|
||||
|
@ -450,7 +450,7 @@ void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionConte
|
|||
OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
|
||||
bool skip_notexist, absl::Span<std::string_view> vals) {
|
||||
EngineShard* es = op_args.shard;
|
||||
MainIterator it;
|
||||
PrimeIterator it;
|
||||
bool new_key = false;
|
||||
|
||||
if (skip_notexist) {
|
||||
|
@ -498,11 +498,11 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
|
|||
OpResult<StringVec> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir,
|
||||
uint32_t count) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<MainIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
|
||||
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
|
||||
if (!it_res)
|
||||
return it_res.status();
|
||||
|
||||
MainIterator it = *it_res;
|
||||
PrimeIterator it = *it_res;
|
||||
quicklist* ql = GetQL(it->second);
|
||||
db_slice.PreUpdate(op_args.db_ind, it);
|
||||
|
||||
|
|
|
@ -374,7 +374,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const ArgS
|
|||
OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key, const ArgSlice& vals) {
|
||||
auto* es = op_args.shard;
|
||||
auto& db_slice = es->db_slice();
|
||||
OpResult<MainIterator> find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET);
|
||||
if (!find_res) {
|
||||
return find_res.status();
|
||||
}
|
||||
|
@ -420,7 +420,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
|
|||
|
||||
for (auto k : largs) {
|
||||
unsigned index = (k == src_) ? 0 : 1;
|
||||
OpResult<MainIterator> res = es->db_slice().Find(t->db_index(), k, OBJ_SET);
|
||||
OpResult<PrimeIterator> res = es->db_slice().Find(t->db_index(), k, OBJ_SET);
|
||||
if (res && index == 0) { // succesful src find.
|
||||
DCHECK(!res->is_done());
|
||||
const CompactObj& val = res.value()->second;
|
||||
|
@ -522,7 +522,7 @@ void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
std::string_view val = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpResult<MainIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
|
||||
|
||||
if (find_res) {
|
||||
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
|
||||
|
@ -587,7 +587,7 @@ void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) {
|
|||
std::string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
OpResult<MainIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
|
||||
if (!find_res) {
|
||||
return find_res.status();
|
||||
}
|
||||
|
@ -902,7 +902,7 @@ OpResult<StringVec> SetFamily::OpUnion(const OpArgs& op_args, ArgSlice keys) {
|
|||
absl::flat_hash_set<string> uniques;
|
||||
|
||||
for (std::string_view key : keys) {
|
||||
OpResult<MainIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
if (find_res) {
|
||||
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
|
||||
FillSet(st, [&uniques](string s) { uniques.emplace(move(s)); });
|
||||
|
@ -921,7 +921,7 @@ OpResult<StringVec> SetFamily::OpDiff(const OpArgs& op_args, ArgSlice keys) {
|
|||
DCHECK(!keys.empty());
|
||||
DVLOG(1) << "OpDiff from " << keys.front();
|
||||
EngineShard* es = op_args.shard;
|
||||
OpResult<MainIterator> find_res = es->db_slice().Find(op_args.db_ind, keys.front(), OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = es->db_slice().Find(op_args.db_ind, keys.front(), OBJ_SET);
|
||||
|
||||
if (!find_res) {
|
||||
return find_res.status();
|
||||
|
@ -935,7 +935,7 @@ OpResult<StringVec> SetFamily::OpDiff(const OpArgs& op_args, ArgSlice keys) {
|
|||
DCHECK(!uniques.empty()); // otherwise the key would not exist.
|
||||
|
||||
for (size_t i = 1; i < keys.size(); ++i) {
|
||||
OpResult<MainIterator> diff_res = es->db_slice().Find(op_args.db_ind, keys[i], OBJ_SET);
|
||||
OpResult<PrimeIterator> diff_res = es->db_slice().Find(op_args.db_ind, keys[i], OBJ_SET);
|
||||
if (!diff_res) {
|
||||
if (diff_res.status() == OpStatus::WRONG_TYPE) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
@ -972,7 +972,7 @@ OpResult<StringVec> SetFamily::OpDiff(const OpArgs& op_args, ArgSlice keys) {
|
|||
|
||||
OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned count) {
|
||||
auto* es = op_args.shard;
|
||||
OpResult<MainIterator> find_res = es->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = es->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
|
@ -980,7 +980,7 @@ OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key
|
|||
if (count == 0)
|
||||
return result;
|
||||
|
||||
MainIterator it = find_res.value();
|
||||
PrimeIterator it = find_res.value();
|
||||
size_t slen = it->second.Size();
|
||||
SetType st{it->second.RObjPtr(), it->second.Encoding()};
|
||||
|
||||
|
@ -1029,7 +1029,7 @@ OpResult<StringVec> SetFamily::OpInter(const Transaction* t, EngineShard* es, bo
|
|||
|
||||
StringVec result;
|
||||
if (keys.size() == 1) {
|
||||
OpResult<MainIterator> find_res = es->db_slice().Find(t->db_index(), keys.front(), OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys.front(), OBJ_SET);
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
|
@ -1043,7 +1043,7 @@ OpResult<StringVec> SetFamily::OpInter(const Transaction* t, EngineShard* es, bo
|
|||
vector<SetType> sets(keys.size());
|
||||
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
OpResult<MainIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET);
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
robj* sobj = find_res.value()->second.AsRObj();
|
||||
|
@ -1101,12 +1101,12 @@ OpResult<StringVec> SetFamily::OpInter(const Transaction* t, EngineShard* es, bo
|
|||
|
||||
OpResult<StringVec> SetFamily::OpScan(const OpArgs& op_args, std::string_view key,
|
||||
uint64_t* cursor) {
|
||||
OpResult<MainIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
MainIterator it = find_res.value();
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
uint32_t count = 10;
|
||||
|
||||
|
|
|
@ -38,7 +38,7 @@ void SliceSnapshot::Start(DbSlice* slice) {
|
|||
auto on_change = [this, slice](DbIndex db_index, const DbSlice::ChangeReq& req) {
|
||||
PrimeTable* table = slice->GetTables(db_index).first;
|
||||
|
||||
if (const MainIterator* it = get_if<MainIterator>(&req)) {
|
||||
if (const PrimeIterator* it = get_if<PrimeIterator>(&req)) {
|
||||
if (it->GetVersion() < snapshot_version_) {
|
||||
side_saved_ += SerializePhysicalBucket(table, *it);
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ void SliceSnapshot::Join() {
|
|||
|
||||
static_assert(sizeof(PrimeTable::const_iterator) == 16);
|
||||
|
||||
void SliceSnapshot::SerializeSingleEntry(MainIterator it) {
|
||||
void SliceSnapshot::SerializeSingleEntry(PrimeIterator it) {
|
||||
error_code ec;
|
||||
|
||||
string tmp;
|
||||
|
@ -160,7 +160,7 @@ bool SliceSnapshot::FlushSfile(bool force) {
|
|||
//
|
||||
// It's important that cb will run atomically so we avoid anu I/O work inside it.
|
||||
// Instead, we flush our string file to disk in the traverse loop below.
|
||||
bool SliceSnapshot::SaveCb(MainIterator it) {
|
||||
bool SliceSnapshot::SaveCb(PrimeIterator it) {
|
||||
// if we touched that physical bucket - skip it.
|
||||
// We must to make sure we TraverseBucket exactly once for each physical bucket.
|
||||
// This test is the first one because it's likely to be the fastest one:
|
||||
|
@ -194,7 +194,7 @@ unsigned SliceSnapshot::SerializePhysicalBucket(PrimeTable* table, PrimeTable::c
|
|||
SerializeSingleEntry(move(entry_it));
|
||||
});
|
||||
|
||||
table->TraverseBucket(it, [this](MainIterator entry_it) {
|
||||
table->TraverseBucket(it, [this](PrimeIterator entry_it) {
|
||||
DCHECK_LE(entry_it.GetVersion(), snapshot_version_);
|
||||
DVLOG(3) << "Bumping up version " << entry_it.bucket_id() << ":" << entry_it.slot_id();
|
||||
|
||||
|
|
|
@ -33,8 +33,8 @@ class SliceSnapshot {
|
|||
private:
|
||||
void FiberFunc();
|
||||
bool FlushSfile(bool force);
|
||||
void SerializeSingleEntry(MainIterator it);
|
||||
bool SaveCb(MainIterator it);
|
||||
void SerializeSingleEntry(PrimeIterator it);
|
||||
bool SaveCb(PrimeIterator it);
|
||||
|
||||
// Returns number of entries serialized.
|
||||
unsigned SerializePhysicalBucket(PrimeTable* table, PrimeTable::const_iterator it);
|
||||
|
|
|
@ -178,7 +178,7 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
|||
std::string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> {
|
||||
OpResult<MainIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
|
@ -444,7 +444,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<size_t> {
|
||||
OpResult<MainIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
|
@ -472,7 +472,7 @@ void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<string> {
|
||||
OpResult<MainIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
return it_res.status();
|
||||
|
||||
|
@ -571,11 +571,11 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction
|
|||
|
||||
auto& db_slice = shard->db_slice();
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
OpResult<MainIterator> it_res = db_slice.Find(t->db_index(), args[i], OBJ_STRING);
|
||||
OpResult<PrimeIterator> it_res = db_slice.Find(t->db_index(), args[i], OBJ_STRING);
|
||||
if (!it_res)
|
||||
continue;
|
||||
|
||||
const MainIterator& it = *it_res;
|
||||
const PrimeIterator& it = *it_res;
|
||||
auto& dest = response[i].emplace();
|
||||
|
||||
it->second.GetString(&dest.value);
|
||||
|
@ -673,7 +673,7 @@ OpResult<uint32_t> StringFamily::ExtendOrSet(const OpArgs& op_args, std::string_
|
|||
OpResult<bool> StringFamily::ExtendOrSkip(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view val, bool prepend) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<MainIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
|
||||
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING);
|
||||
if (!it_res) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -17,10 +17,10 @@ using ExpireTable = DashTable<PrimeKey, ExpirePeriod, detail::ExpireTablePolicy>
|
|||
|
||||
/// Iterators are invalidated when new keys are added to the table or some entries are deleted.
|
||||
/// Iterators are still valid if a different entry in the table was mutated.
|
||||
using MainIterator = PrimeTable::iterator;
|
||||
using PrimeIterator = PrimeTable::iterator;
|
||||
using ExpireIterator = ExpireTable::iterator;
|
||||
|
||||
inline bool IsValid(MainIterator it) {
|
||||
inline bool IsValid(PrimeIterator it) {
|
||||
return !it.is_done();
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ struct Transaction::FindFirstProcessor {
|
|||
// Holds Find results: (iterator to a found key, and its index in the passed arguments).
|
||||
// See DbSlice::FindFirst for more details.
|
||||
// spans all the shards for now.
|
||||
std::vector<OpResult<std::pair<MainIterator, unsigned>>> find_res_;
|
||||
std::vector<OpResult<std::pair<PrimeIterator, unsigned>>> find_res_;
|
||||
TxId notify_txid_;
|
||||
};
|
||||
|
||||
|
|
|
@ -191,7 +191,7 @@ class Transaction {
|
|||
// grows the table and invalidates find_res. We should return a key, unfortunately,
|
||||
// and not the iterator.
|
||||
struct FindFirstResult {
|
||||
MainIterator find_res;
|
||||
PrimeIterator find_res;
|
||||
ShardId sid = kInvalidSid;
|
||||
};
|
||||
|
||||
|
|
|
@ -30,7 +30,7 @@ static const char kNxXxErr[] = "XX and NX options at the same time are not compa
|
|||
static const char kScoreNaN[] = "resulting score is not a number (NaN)";
|
||||
constexpr unsigned kMaxListPackValue = 64;
|
||||
|
||||
OpResult<MainIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string_view key,
|
||||
OpResult<PrimeIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string_view key,
|
||||
size_t member_len) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
if (flags & ZADD_IN_XX) {
|
||||
|
@ -377,7 +377,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
OpResult<MainIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_ZSET);
|
||||
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_ZSET);
|
||||
if (!find_res) {
|
||||
return find_res.status();
|
||||
}
|
||||
|
@ -733,7 +733,7 @@ void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext*
|
|||
OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string_view key,
|
||||
ScoredMemberSpan members, AddResult* add_result) {
|
||||
DCHECK(!members.empty());
|
||||
OpResult<MainIterator> res_it =
|
||||
OpResult<PrimeIterator> res_it =
|
||||
FindZEntry(zparams.flags, op_args, key, members.front().second.size());
|
||||
|
||||
if (!res_it)
|
||||
|
@ -788,7 +788,7 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string
|
|||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) {
|
||||
OpResult<MainIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
|
@ -810,7 +810,7 @@ OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, Arg
|
|||
}
|
||||
|
||||
OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, string_view member) {
|
||||
OpResult<MainIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
|
@ -827,7 +827,7 @@ OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, str
|
|||
|
||||
auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
|
||||
-> OpResult<ScoredArray> {
|
||||
OpResult<MainIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
|
@ -841,7 +841,7 @@ auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, st
|
|||
|
||||
OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key,
|
||||
const ZRangeSpec& range_spec) {
|
||||
OpResult<MainIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
|
|
Loading…
Reference in a new issue