1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Support COUNT option in SCAN

This commit is contained in:
Roman Gershman 2022-03-28 22:35:25 +03:00
parent 7595ff6236
commit a6808445cf
3 changed files with 33 additions and 7 deletions

View file

@ -219,7 +219,6 @@ API 2.0
- [ ] RESTORE
- [X] SCRIPT LOAD
- [ ] SCRIPT DEBUG/KILL/FLUSH/EXISTS
- [ ] TOUCH
- [X] Set Family
- [X] SSCAN
- [X] SMISMEMBER

View file

@ -414,10 +414,11 @@ void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) {
}
void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
std::string_view token = ArgS(args, 1);
string_view token = ArgS(args, 1);
uint64_t cursor = 0;
EngineShardSet* ess = cntx->shard_set;
unsigned shard_count = ess->size();
uint32_t limit = 10;
// Dash table returns a cursor with its right byte empty. We will use it
// for encoding shard index. For now scan has a limitation of 255 shards.
@ -427,6 +428,29 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError("invalid cursor");
}
for (unsigned i = 2; i < args.size(); i += 2) {
if (i + 1 == args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
ToUpper(&args[i]);
string_view opt = ArgS(args, i);
if (opt == "COUNT") {
if (!absl::SimpleAtoi(ArgS(args, i+ 1), &limit)) {
return (*cntx)->SendError(kInvalidIntErr);
}
if (limit == 0)
limit = 1;
else if (limit > 4096)
limit = 4096;
} else if (opt == "MATCH" || opt == "TYPE") {
return (*cntx)->SendError("Not supported"); // TODO
} else {
return (*cntx)->SendError(kSyntaxErr);
}
}
ShardId sid = cursor % 1024;
if (sid >= shard_count) {
return (*cntx)->SendError("invalid cursor");
@ -438,14 +462,14 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index};
OpScan(op_args, &cursor, &keys);
OpScan(op_args, limit, &cursor, &keys);
});
if (cursor == 0) {
++sid;
if (unsigned(sid) == shard_count)
break;
}
} while (keys.size() < 10);
} while (keys.size() < limit);
if (sid < shard_count) {
cursor = (cursor << 10) | sid;
@ -571,7 +595,8 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
return OpStatus::OK;
}
void GenericFamily::OpScan(const OpArgs& op_args, uint64_t* cursor, vector<string>* vec) {
void GenericFamily::OpScan(const OpArgs& op_args, size_t limit, uint64_t* cursor,
vector<string>* vec) {
auto& db_slice = op_args.shard->db_slice();
DCHECK(db_slice.IsDbValid(op_args.db_ind));
@ -591,7 +616,7 @@ void GenericFamily::OpScan(const OpArgs& op_args, uint64_t* cursor, vector<strin
auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind);
do {
cur = prime_table->Traverse(cur, scan_cb);
} while (cur && cnt < 10);
} while (cur && cnt < limit);
VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur;
*cursor = cur;

View file

@ -65,7 +65,9 @@ class GenericFamily {
static OpResult<uint32_t> OpExists(const OpArgs& op_args, ArgSlice keys);
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists);
static void OpScan(const OpArgs& op_args, uint64_t* cursor, std::vector<std::string>* vec);
static void OpScan(const OpArgs& op_args, size_t limit, uint64_t* cursor,
std::vector<std::string>* vec);
};
} // namespace dfly