1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

Bug fixes.

1. Fix memory corruption bug in quicklist and listpack due to wrong assert usage.
2. Add HSETNX command and fix skip_exists semantics in OpSet function.
3. Add tests for hsetnx.
4. Add MEMORY USAGE decorator.
5. redis_parser accepts arrays upto 8192 elements.
6. Fix listpack replace call when passing an empty string.
7. Implement "debug object" command.
This commit is contained in:
Roman Gershman 2022-04-04 22:48:45 +03:00
parent 8a1396de31
commit cae1403191
14 changed files with 201 additions and 35 deletions

View file

@ -332,7 +332,8 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// We wait for dispatch_fb to finish writing the previous replies before replying to the last
// offending request.
if (parse_status == ERROR) {
VLOG(1) << "Error stats " << parse_status;
VLOG(1) << "Error parser status " << parse_status;
if (redis_parser_) {
SendProtocolError(RedisParser::Result(parser_error_), peer);
} else {

View file

@ -13,7 +13,7 @@ using namespace std;
namespace {
constexpr int kMaxArrayLen = 1024;
constexpr int kMaxArrayLen = 8192;
constexpr int64_t kMaxBulkLen = 64 * (1ul << 20); // 64MB.
} // namespace
@ -237,8 +237,11 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> Result {
case BAD_INT:
return BAD_ARRAYLEN;
case OK:
if (len < -1 || len > kMaxArrayLen)
if (len < -1 || len > kMaxArrayLen) {
VLOG_IF(1, len > kMaxArrayLen) << "Milti bulk len is too big " << len;
return BAD_ARRAYLEN;
}
break;
default:
LOG(ERROR) << "Unexpected result " << res;

View file

@ -1381,7 +1381,8 @@ void lpRandomPair(unsigned char *lp, unsigned long total_count, listpackEntry *k
if (!val)
return;
assert((p = lpNext(lp, p)));
p = lpNext(lp, p);
assert(p);
val->sval = lpGetValue(p, &(val->slen), &(val->lval));
}
@ -1420,7 +1421,8 @@ void lpRandomPairs(unsigned char *lp, unsigned int count, listpackEntry *keys, l
p = lpSeek(lp, lpindex);
while (p && pickindex < count) {
key = lpGetValue(p, &klen, &klval);
assert((p = lpNext(lp, p)));
p = lpNext(lp, p);
assert(p);
value = lpGetValue(p, &vlen, &vlval);
while (pickindex < count && lpindex == picks[pickindex].index) {
int storeorder = picks[pickindex].order;
@ -1463,7 +1465,8 @@ unsigned int lpRandomPairsUnique(unsigned char *lp, unsigned int count, listpack
if (randomDouble <= threshold) {
key = lpGetValue(p, &klen, &klval);
lpSaveValue(key, klen, klval, &keys[picked]);
assert((p = lpNext(lp, p)));
p = lpNext(lp, p);
assert(p);
if (vals) {
key = lpGetValue(p, &klen, &klval);
lpSaveValue(key, klen, klval, &vals[picked]);
@ -1471,7 +1474,8 @@ unsigned int lpRandomPairsUnique(unsigned char *lp, unsigned int count, listpack
remaining--;
picked++;
} else {
assert((p = lpNext(lp, p)));
p = lpNext(lp, p);
assert(p);
}
p = lpNext(lp, p);
index++;

View file

@ -894,7 +894,9 @@ int getIntFromObjectOrReply(client *c, robj *o, int *target, const char *msg) {
return C_OK;
}
char *strEncoding(int encoding) {
#endif
const char *strEncoding(int encoding) {
switch(encoding) {
case OBJ_ENCODING_RAW: return "raw";
case OBJ_ENCODING_INT: return "int";
@ -910,7 +912,6 @@ char *strEncoding(int encoding) {
}
}
#endif
/* Given an object returns the min number of milliseconds the object was never
* requested, using an approximated LRU algorithm. */

View file

@ -192,7 +192,7 @@ int hashTypeGetFromListpack(robj *o, sds field,
unsigned char **vstr,
unsigned int *vlen,
long long *vll);
const char *strEncoding(int encoding);
/* Macro used to initialize a Redis object allocated on the stack.
* Note that this macro is taken near the structure definition to make sure

View file

@ -1428,7 +1428,7 @@ quicklistIter *quicklistGetIteratorEntryAtIdx(quicklist *quicklist, const long l
{
quicklistIter *iter = quicklistGetIteratorAtIdx(quicklist, AL_START_TAIL, idx);
if (!iter) return NULL;
assert(quicklistNext(iter, entry));
quicklistNext(iter, entry);
return iter;
}

View file

@ -25,7 +25,7 @@ using namespace std;
using namespace util;
namespace this_fiber = ::boost::this_fiber;
using boost::fibers::fiber;
using facade::kUintErr;
using namespace facade;
namespace fs = std::filesystem;
struct PopulateBatch {
@ -60,6 +60,8 @@ void DebugCmd::Run(CmdArgList args) {
if (subcmd == "HELP") {
std::string_view help_arr[] = {
"DEBUG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"OBJECT <key>",
" Show low-level info about `key` and associated value.",
"RELOAD [option ...]",
" Save the RDB on disk (TBD) and reload it back to memory. Valid <option> values:",
" * NOSAVE: the database will be loaded from an existing RDB file.",
@ -85,6 +87,11 @@ void DebugCmd::Run(CmdArgList args) {
return Reload(args);
}
if (subcmd == "OBJECT" && args.size() == 3) {
string_view key = ArgS(args, 2);
return Inspect(key);
}
string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
"'. Try DEBUG HELP.");
return (*cntx_)->SendError(reply, kSyntaxErr);
@ -171,9 +178,8 @@ void DebugCmd::Populate(CmdArgList args) {
auto range = ranges[i];
// whatever we do, we should not capture i by reference.
fb_arr[i] = ess_->pool()->at(i)->LaunchFiber([=] {
this->PopulateRangeFiber(range.first, range.second, prefix, val_size);
});
fb_arr[i] = ess_->pool()->at(i)->LaunchFiber(
[=] { this->PopulateRangeFiber(range.first, range.second, prefix, val_size); });
}
for (auto& fb : fb_arr)
fb.join();
@ -217,4 +223,26 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
});
}
void DebugCmd::Inspect(string_view key) {
ShardId sid = Shard(key, ess_->size());
using ObjInfo = pair<unsigned, unsigned>; // type, encoding.
auto cb = [&]() -> facade::OpResult<ObjInfo> {
auto& db_slice = EngineShard::tlocal()->db_slice();
PrimeIterator it = db_slice.FindExt(cntx_->db_index(), key).first;
if (IsValid(it)) {
return ObjInfo(it->second.ObjType(), it->second.Encoding());
}
return OpStatus::KEY_NOTFOUND;
};
OpResult<ObjInfo> res = ess_->Await(sid, cb);
if (res) {
string resp = absl::StrCat("Value encoding:", strEncoding(res->second));
(*cntx_)->SendSimpleString(resp);
} else {
(*cntx_)->SendError(res.status());
}
}
} // namespace dfly

View file

@ -20,6 +20,7 @@ class DebugCmd {
void Populate(CmdArgList args);
void PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, unsigned value_len);
void Reload(CmdArgList args);
void Inspect(std::string_view key);
EngineShardSet* ess_;
ConnectionContext* cntx_;

View file

@ -48,24 +48,35 @@ string LpGetVal(uint8_t* lp_it) {
}
// returns a new pointer to lp. Returns true if field was inserted or false it it already existed.
pair<uint8_t*, bool> lpInsertElem(uint8_t* lp, string_view field, string_view val) {
// skip_exists controls what happens if the field already existed. If skip_exists = true,
// then val does not override the value and listpack is not changed. Otherwise, the corresponding
// value is overriden by val.
pair<uint8_t*, bool> LpInsert(uint8_t* lp, string_view field, string_view val, bool skip_exists) {
uint8_t* vptr;
uint8_t* fptr = lpFirst(lp);
uint8_t* fsrc = (uint8_t*)field.data();
uint8_t* vsrc = (uint8_t*)val.data();
// if we vsrc is NULL then lpReplace will delete the element, which is not what we want.
// therefore, for an empty val we set it to some other valid address so that lpReplace
// will do the right thing and encode empty string instead of deleting the element.
uint8_t* vsrc = val.empty() ? lp : (uint8_t*)val.data();
bool updated = false;
if (fptr) {
fptr = lpFind(lp, fptr, fsrc, field.size(), 1);
if (fptr) {
if (skip_exists) {
return make_pair(lp, false);
}
/* Grab pointer to the value (fptr points to the field) */
vptr = lpNext(lp, fptr);
updated = true;
/* Replace value */
lp = lpReplace(lp, &vptr, vsrc, val.size());
DCHECK_EQ(0u, lpLength(lp) % 2);
}
}
@ -256,16 +267,35 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g
void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
ToLower(&args[0]);
string_view cmd = ArgS(args, 0);
args.remove_prefix(2);
if (args.size() % 2 != 0) {
return (*cntx)->SendError(facade::WrongNumArgsError("hset"), kSyntaxErr);
return (*cntx)->SendError(facade::WrongNumArgsError(cmd), kSyntaxErr);
}
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(OpArgs{shard, t->db_index()}, key, args, false);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result && cmd == "hset") {
(*cntx)->SendLong(*result);
} else {
(*cntx)->SendError(result.status());
}
}
void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
args.remove_prefix(2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSet(OpArgs{shard, t->db_index()}, key, args, true);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->SendLong(*result);
@ -274,12 +304,56 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
}
}
void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
LOG(DFATAL) << "TBD";
}
void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
LOG(DFATAL) << "TBD";
void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH);
if (!it_res)
return it_res.status();
const PrimeValue& pv = it_res.value()->second;
StringVec str_vec;
if (pv.Encoding() == OBJ_ENCODING_HT) {
dict* this_dict = (dict*)pv.RObjPtr();
dictEntry* de = dictGetFairRandomKey(this_dict);
sds key = (sds)de->key;
str_vec.emplace_back(key, sdslen(key));
} else if (pv.Encoding() == OBJ_ENCODING_LISTPACK) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
size_t lplen = lpLength(lp);
CHECK(lplen > 0 && lplen % 2 == 0);
size_t hlen = lplen / 2;
listpackEntry key;
lpRandomPair(lp, hlen, &key, NULL);
if (key.sval) {
str_vec.emplace_back(reinterpret_cast<char*>(key.sval), key.slen);
} else {
str_vec.emplace_back(absl::StrCat(key.lval));
}
} else {
LOG(ERROR) << "Invalid encoding " << pv.Encoding();
return OpStatus::INVALID_VALUE;
}
return str_vec;
};
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
CHECK_EQ(1u, result->size()); // TBD: to support count and withvalues.
(*cntx)->SendBulkString(result->front());
} else {
(*cntx)->SendError(result.status());
}
}
OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
@ -323,22 +397,33 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
if (lp) {
bool inserted;
for (size_t i = 0; i < values.size(); i += 2) {
tie(lp, inserted) = lpInsertElem(lp, ArgS(values, i), ArgS(values, i + 1));
tie(lp, inserted) = LpInsert(lp, ArgS(values, i), ArgS(values, i + 1), skip_if_exists);
created += inserted;
}
hset->ptr = lp;
stats->listpack_bytes += lpBytes(lp);
} else {
DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding);
dict* this_dict = (dict*)hset->ptr;
// Dictionary
for (size_t i = 0; i < values.size(); i += 2) {
sds fs = sdsnewlen(values[i].data(), values[i].size());
sds vs = sdsnewlen(values[i + 1].data(), values[i + 1].size());
dictEntry* existing;
dictEntry* de = dictAddRaw(this_dict, fs, &existing);
if (de) {
++created;
} else { // already exists
sdsfree(fs);
if (skip_if_exists)
continue;
// hashTypeSet checks for hash_max_listpack_entries and converts into dictionary
// if it goes beyond.
created += !hashTypeSet(hset, fs, vs, HASH_SET_TAKE_FIELD | HASH_SET_TAKE_VALUE);
de = existing;
dictFreeVal(this_dict, existing);
}
sds vs = sdsnewlen(values[i + 1].data(), values[i + 1].size());
dictSetVal(this_dict, de, vs);
}
}
it->second.SyncRObj();
@ -419,6 +504,8 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList
int64_t ele_len;
string_view key;
DCHECK(lp_elem); // empty containers are not allowed.
size_t lplen = lpLength(lp);
DCHECK(lplen > 0 && lplen % 2 == 0);
// We do single pass on listpack for this operation.
do {
@ -449,7 +536,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList
dictEntry* de = dictFind(d, op_args.shard->tmp_str1);
if (de) {
sds val = (sds)dictGetVal(de);
result[i]->assign(val, sdslen(val));
result[i].emplace(val, sdslen(val));
}
}
}
@ -616,7 +703,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
string_view sval{buf, size_t(next - buf)};
uint8_t* lp = (uint8_t*)hset->ptr;
lp = lpInsertElem(lp, field, sval).first;
lp = LpInsert(lp, field, sval, false).first;
hset->ptr = lp;
stats->listpack_bytes += lpBytes(lp);
} else {
@ -645,10 +732,13 @@ void HSetFamily::Register(CommandRegistry* registry) {
<< CI{"HMSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
<< CI{"HINCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HIncrBy)
<< CI{"HKEYS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HKeys)
<< CI{"HVALS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HVals)
// TODO: add options support
<< CI{"HRANDFIELD", CO::READONLY, 2, 1, 1, 1}.HFUNC(HRandField)
<< CI{"HSET", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(HSet)
<< CI{"HSETNX", CO::WRITE | CO::DENYOOM | CO::FAST, 4, 1, 1, 1}.HFUNC(HSetNx)
<< CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen);
<< CI{"HSTRLEN", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(HStrLen)
<< CI{"HVALS", CO::READONLY, 2, 1, 1, 1}.HFUNC(HVals);
}
} // namespace dfly

View file

@ -36,6 +36,7 @@ class HSetFamily {
static void HSet(CmdArgList args, ConnectionContext* cntx);
static void HSetNx(CmdArgList args, ConnectionContext* cntx);
static void HStrLen(CmdArgList args, ConnectionContext* cntx);
static void HRandField(CmdArgList args, ConnectionContext* cntx);
static void HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask);

View file

@ -52,6 +52,8 @@ TEST_F(HSetFamilyTest, Basic) {
EXPECT_EQ(0, CheckedInt({"hset", "x", "a", "b"}));
EXPECT_EQ(0, CheckedInt({"hset", "x", "a", "c"}));
EXPECT_EQ(0, CheckedInt({"hset", "x", "a", ""}));
EXPECT_EQ(2, CheckedInt({"hset", "y", "a", "c", "d", "e"}));
EXPECT_EQ(2, CheckedInt({"hdel", "y", "a", "d"}));
}
@ -85,4 +87,19 @@ TEST_F(HSetFamilyTest, Get) {
EXPECT_THAT(resp, ElementsAre("a", "1", "b", "2", "c", "3"));
}
TEST_F(HSetFamilyTest, HSetNx) {
EXPECT_EQ(1, CheckedInt({"hsetnx", "key", "field", "val"}));
EXPECT_THAT(Run({"hget", "key", "field"}), RespEq("val"));
EXPECT_EQ(0, CheckedInt({"hsetnx", "key", "field", "val2"}));
EXPECT_THAT(Run({"hget", "key", "field"}), RespEq("val"));
EXPECT_EQ(1, CheckedInt({"hsetnx", "key", "field2", "val2"}));
EXPECT_THAT(Run({"hget", "key", "field2"}), RespEq("val2"));
// check dict path
EXPECT_EQ(0, CheckedInt({"hsetnx", "key", "field2", string(512, 'a')}));
EXPECT_THAT(Run({"hget", "key", "field2"}), RespEq("val2"));
}
} // namespace dfly

View file

@ -265,6 +265,18 @@ void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
return dbg_cmd.Run(args);
}
void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "USAGE") {
return (*cntx)->SendLong(1);
}
string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd,
"'. Try MEMORY HELP.");
return (*cntx)->SendError(err, kSyntaxErr);
}
void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
static unsigned fl_index = 1;
@ -629,6 +641,8 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs,
void ServerFamily::Register(CommandRegistry* registry) {
constexpr auto kReplicaOpts = CO::ADMIN | CO::GLOBAL_TRANS;
constexpr auto kMemOpts = CO::LOADING | CO::READONLY | CO::FAST | CO::NOSCRIPT;
*registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth)
<< CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config)
@ -638,6 +652,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll)
<< CI{"INFO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Info)
<< CI{"LASTSAVE", CO::LOADING | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave)
<< CI{"MEMORY", kMemOpts, -2, 0, 0, 0}.HFUNC(Memory)
<< CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown)
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)

View file

@ -63,6 +63,7 @@ class ServerFamily {
void Config(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void Debug(CmdArgList args, ConnectionContext* cntx);
void Memory(CmdArgList args, ConnectionContext* cntx);
void FlushDb(CmdArgList args, ConnectionContext* cntx);
void FlushAll(CmdArgList args, ConnectionContext* cntx);
void Info(CmdArgList args, ConnectionContext* cntx);

View file

@ -223,11 +223,15 @@ CmdArgVec BaseFamilyTest::TestConnWrapper::Args(std::initializer_list<std::strin
CmdArgVec res;
for (auto v : list) {
if (v.empty()) {
res.push_back(MutableSlice{});
} else {
tmp_str_vec.emplace_back(new string{v});
auto& s = *tmp_str_vec.back();
res.emplace_back(s.data(), s.size());
}
}
return res;
}