1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-15 17:51:06 +00:00

feat(server): Integrate set_map into hset_family

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-12-23 18:08:03 +02:00 committed by Roman Gershman
parent 5d34f271e9
commit 69b00b9cdd
7 changed files with 383 additions and 272 deletions

View file

@ -24,6 +24,7 @@ extern "C" {
#include "base/logging.h"
#include "base/pod_array.h"
#include "core/detail/bitpacking.h"
#include "core/string_map.h"
#include "core/string_set.h"
ABSL_FLAG(bool, use_set2, true, "If true use DenseSet for an optimized set data structure");
@ -91,8 +92,10 @@ size_t MallocUsedHSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingListPack:
return lpBytes(reinterpret_cast<uint8_t*>(ptr));
case kEncodingStrMap:
return DictMallocSize((dict*)ptr);
case kEncodingStrMap2: {
StringMap* sm = (StringMap*)ptr;
return sm->ObjMallocUsed() + sm->SetMallocUsed();
}
}
LOG(DFATAL) << "Unknown set encoding type " << encoding;
return 0;
@ -118,8 +121,8 @@ size_t MallocUsedStream(unsigned encoding, void* streamv) {
inline void FreeObjHash(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap:
dictRelease((dict*)ptr);
case kEncodingStrMap2:
delete ((StringMap*)ptr);
break;
case kEncodingListPack:
lpFree((uint8_t*)ptr);
@ -546,7 +549,7 @@ void CompactObj::ImportRObj(robj* o) {
}
} else if (o->type == OBJ_HASH) {
if (o->encoding == OBJ_ENCODING_HT) {
enc = kEncodingStrMap;
enc = kEncodingStrMap2;
} else {
enc = kEncodingListPack;
}
@ -561,17 +564,14 @@ robj* CompactObj::AsRObj() const {
CHECK_EQ(ROBJ_TAG, taglen_);
robj* res = &tl.tmp_robj;
unsigned enc = u_.r_obj.encoding();
res->type = u_.r_obj.type();
if (res->type == OBJ_SET) {
LOG(FATAL) << "Should not call AsRObj for type " << res->type;
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
}
if (res->type == OBJ_HASH) {
res->encoding = (enc == kEncodingListPack) ? OBJ_ENCODING_LISTPACK : OBJ_ENCODING_HT;
} else {
res->encoding = enc;
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
}
res->lru = 0; // u_.r_obj.unneeded;
res->ptr = u_.r_obj.inner_obj();
@ -593,9 +593,6 @@ void CompactObj::SyncRObj() {
CHECK_NE(OBJ_SET, obj->type) << "sets should be handled without robj";
unsigned enc = obj->encoding;
if (obj->type == OBJ_HASH) {
enc = (obj->encoding == OBJ_ENCODING_LISTPACK) ? kEncodingListPack : kEncodingStrMap;
}
u_.r_obj.Init(obj->type, enc, obj->ptr);
}

View file

@ -30,7 +30,7 @@ StringMap::~StringMap() {
Clear();
}
bool StringMap::AddOrSet(string_view field, string_view value, uint32_t ttl_sec) {
bool StringMap::AddOrUpdate(string_view field, string_view value, uint32_t ttl_sec) {
CHECK_EQ(ttl_sec, UINT32_MAX); // TBD
// 8 additional bytes for a pointer to value.
@ -60,9 +60,32 @@ bool StringMap::AddOrSet(string_view field, string_view value, uint32_t ttl_sec)
return true;
}
bool StringMap::AddOrSkip(std::string_view field, std::string_view value, uint32_t ttl_sec) {
CHECK_EQ(ttl_sec, UINT32_MAX); // TBD
void* obj = FindInternal(&field, 1); // 1 - string_view
if (obj)
return false;
// 8 additional bytes for a pointer to value.
sds newkey = AllocSdsWithSpace(field.size(), 8);
if (!field.empty()) {
memcpy(newkey, field.data(), field.size());
}
sds val = sdsnewlen(value.data(), value.size());
absl::little_endian::Store64(newkey + field.size() + 1, uint64_t(val));
bool has_ttl = false;
sds prev_entry = (sds)AddOrFind(newkey, has_ttl);
DCHECK(!prev_entry);
return true;
}
bool StringMap::Erase(string_view key) {
LOG(FATAL) << "TBD";
return false;
return EraseInternal(&key, 1);
}
bool StringMap::Contains(string_view field) const {

View file

@ -76,11 +76,19 @@ class StringMap : public DenseSet {
// Returns true if field was added
// otherwise updates its value and returns false.
bool AddOrSet(std::string_view field, std::string_view value, uint32_t ttl_sec = UINT32_MAX);
bool AddOrUpdate(std::string_view field, std::string_view value, uint32_t ttl_sec = UINT32_MAX);
// Returns true if field was added
// false, if already exists. In that case no update is done.
bool AddOrSkip(std::string_view field, std::string_view value, uint32_t ttl_sec = UINT32_MAX);
bool Erase(std::string_view s1);
bool Contains(std::string_view s1) const;
/// @brief Returns value of the key or nullptr if key not found.
/// @param key
/// @return sds
sds Find(std::string_view key);
void Clear();

View file

@ -70,7 +70,7 @@ class StringMapTest : public ::testing::Test {
};
TEST_F(StringMapTest, Basic) {
EXPECT_TRUE(sm_->AddOrSet("foo", "bar"));
EXPECT_TRUE(sm_->AddOrUpdate("foo", "bar"));
EXPECT_TRUE(sm_->Contains("foo"));
EXPECT_STREQ("bar", sm_->Find("foo"));
@ -86,7 +86,7 @@ TEST_F(StringMapTest, Basic) {
}
size_t sz = sm_->ObjMallocUsed();
EXPECT_FALSE(sm_->AddOrSet("foo", "baraaaaaaaaaaaa2"));
EXPECT_FALSE(sm_->AddOrUpdate("foo", "baraaaaaaaaaaaa2"));
EXPECT_GT(sm_->ObjMallocUsed(), sz);
it = sm_->begin();
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);

View file

@ -12,6 +12,7 @@ extern "C" {
}
#include "base/logging.h"
#include "core/string_map.h"
#include "facade/error.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
@ -42,23 +43,37 @@ bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
return lpBytes(const_cast<uint8_t*>(lp)) + sum < kMaxListPackLen;
}
string LpGetVal(uint8_t* lp_it) {
int64_t ele_len;
uint8_t* ptr = lpGet(lp_it, &ele_len, NULL);
if (ptr) {
return string(reinterpret_cast<char*>(ptr), ele_len);
}
return absl::StrCat(ele_len);
}
string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
inline string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
int64_t ele_len = 0;
uint8_t* elem = lpGet(lp_it, &ele_len, int_buf);
DCHECK(elem);
return string_view{reinterpret_cast<char*>(elem), size_t(ele_len)};
}
optional<string_view> LpFind(uint8_t* lp, string_view key, uint8_t int_buf[]) {
uint8_t* fptr = lpFirst(lp);
DCHECK(fptr);
fptr = lpFind(lp, fptr, (unsigned char*)key.data(), key.size(), 1);
if (!fptr)
return nullopt;
uint8_t* vptr = lpNext(lp, fptr);
return LpGetView(vptr, int_buf);
}
pair<uint8_t*, bool> LpDelete(uint8_t* lp, string_view field) {
uint8_t* fptr = lpFirst(lp);
DCHECK(fptr);
fptr = lpFind(lp, fptr, (unsigned char*)field.data(), field.size(), 1);
if (fptr == NULL) {
return make_pair(lp, false);
}
/* Delete both of the key and the value. */
lp = lpDeleteRangeWithEntry(lp, &fptr, 2);
return make_pair(lp, true);
}
// returns a new pointer to lp. Returns true if field was inserted or false it it already existed.
// skip_exists controls what happens if the field already existed. If skip_exists = true,
// then val does not override the value and listpack is not changed. Otherwise, the corresponding
@ -102,122 +117,170 @@ pair<uint8_t*, bool> LpInsert(uint8_t* lp, string_view field, string_view val, b
return make_pair(lp, !updated);
}
StringMap* ConvertToStrMap(uint8_t* lp) {
StringMap* sm = new StringMap(CompactObj::memory_resource());
size_t lplen = lpLength(lp);
if (lplen == 0)
return sm;
sm->Reserve(lplen / 2);
uint8_t* lp_elem = lpFirst(lp);
uint8_t intbuf[LP_INTBUF_SIZE];
DCHECK(lp_elem); // empty containers are not allowed.
do {
string_view key = LpGetView(lp_elem, intbuf);
lp_elem = lpNext(lp, lp_elem); // switch to value
DCHECK(lp_elem);
string_view value = LpGetView(lp_elem, intbuf);
lp_elem = lpNext(lp, lp_elem); // switch to next key
CHECK(sm->AddOrUpdate(key, value)); // must be unique
} while (lp_elem);
return sm;
}
size_t HMapLength(const CompactObj& co) {
void* ptr = co.RObjPtr();
if (co.Encoding() == kEncodingStrMap2) {
StringMap* sm = (StringMap*)ptr;
return sm->Size();
}
DCHECK_EQ(kEncodingListPack, co.Encoding());
return lpLength((uint8_t*)ptr) / 2;
}
OpStatus IncrementValue(optional<string_view> prev_val, IncrByParam* param) {
if (holds_alternative<double>(*param)) {
double incr = get<double>(*param);
long double value = 0;
if (prev_val) {
if (!string2ld(prev_val->data(), prev_val->size(), &value)) {
return OpStatus::INVALID_VALUE;
}
}
value += incr;
if (isnan(value) || isinf(value)) {
return OpStatus::INVALID_FLOAT;
}
param->emplace<double>(value);
return OpStatus::OK;
}
// integer increment
long long old_val = 0;
if (prev_val) {
if (!string2ll(prev_val->data(), prev_val->size(), &old_val)) {
return OpStatus::INVALID_VALUE;
}
}
int64_t incr = get<int64_t>(*param);
if ((incr < 0 && old_val < 0 && incr < (LLONG_MIN - old_val)) ||
(incr > 0 && old_val > 0 && incr > (LLONG_MAX - old_val))) {
return OpStatus::OUT_OF_RANGE;
}
int64_t new_val = old_val + incr;
param->emplace<int64_t>(new_val);
return OpStatus::OK;
};
OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, IncrByParam* param) {
auto& db_slice = op_args.shard->db_slice();
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
robj* hset = nullptr;
size_t lpb = 0;
PrimeValue& pv = it->second;
if (inserted) {
pv.InitRobj(OBJ_HASH, kEncodingListPack, lpNew(0));
stats->listpack_blob_cnt++;
hset = it->second.AsRObj();
} else {
if (pv.ObjType() != OBJ_HASH)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
hset = it->second.AsRObj();
if (pv.Encoding() == kEncodingListPack) {
lpb = lpBytes((uint8_t*)pv.RObjPtr());
uint8_t* lp = (uint8_t*)pv.RObjPtr();
lpb = lpBytes(lp);
stats->listpack_bytes -= lpb;
if (lpb >= kMaxListPackLen) {
stats->listpack_blob_cnt--;
hashTypeConvert(hset, OBJ_ENCODING_HT);
StringMap* sm = ConvertToStrMap(lp);
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
}
}
}
int enc = pv.Encoding();
uint8_t* vstr = NULL;
unsigned int vlen = UINT_MAX;
long long old_val = 0;
unsigned enc = pv.Encoding();
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, field.data(), field.size());
if (enc == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
uint8_t* lp = (uint8_t*)pv.RObjPtr();
optional<string_view> res;
int exist_res = hashTypeGetValue(hset, op_args.shard->tmp_str1, &vstr, &vlen, &old_val);
if (!inserted)
res = LpFind(lp, field, intbuf);
if (holds_alternative<double>(*param)) {
long double value;
double incr = get<double>(*param);
if (exist_res == C_OK) {
if (vstr) {
const char* exist_val = reinterpret_cast<char*>(vstr);
if (!string2ld(exist_val, vlen, &value)) {
stats->listpack_bytes += lpb;
return OpStatus::INVALID_VALUE;
}
} else {
value = old_val;
}
value += incr;
if (isnan(value) || isinf(value)) {
return OpStatus::INVALID_FLOAT;
}
} else {
value = incr;
}
char buf[128];
char* str = RedisReplyBuilder::FormatDouble(value, buf, sizeof(buf));
string_view sval{str};
if (enc == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
lp = LpInsert(lp, field, sval, false).first;
pv.SetRObjPtr(lp);
stats->listpack_bytes += lpBytes(lp);
} else {
sds news = sdsnewlen(str, sval.size());
hashTypeSet(hset, op_args.shard->tmp_str1, news, HASH_SET_TAKE_VALUE);
pv.SyncRObj();
}
param->emplace<double>(value);
} else { // integer increment
if (exist_res == C_OK && vstr) {
const char* exist_val = reinterpret_cast<char*>(vstr);
if (!string2ll(exist_val, vlen, &old_val)) {
stats->listpack_bytes += lpb;
return OpStatus::INVALID_VALUE;
}
}
int64_t incr = get<int64_t>(*param);
if ((incr < 0 && old_val < 0 && incr < (LLONG_MIN - old_val)) ||
(incr > 0 && old_val > 0 && incr > (LLONG_MAX - old_val))) {
OpStatus status = IncrementValue(res, param);
if (status != OpStatus::OK) {
stats->listpack_bytes += lpb;
return OpStatus::OUT_OF_RANGE;
return status;
}
int64_t new_val = old_val + incr;
char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(new_val, buf);
string_view sval{buf, size_t(next - buf)};
if (enc == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
lp = LpInsert(lp, field, sval, false).first;
pv.SetRObjPtr(lp);
stats->listpack_bytes += lpBytes(lp);
} else {
sds news = sdsfromlonglong(new_val);
hashTypeSet(hset, op_args.shard->tmp_str1, news, HASH_SET_TAKE_VALUE);
pv.SyncRObj();
if (holds_alternative<double>(*param)) {
double new_val = get<double>(*param);
char buf[128];
char* str = RedisReplyBuilder::FormatDouble(new_val, buf, sizeof(buf));
lp = LpInsert(lp, field, str, false).first;
} else { // integer increment
int64_t new_val = get<int64_t>(*param);
absl::AlphaNum an(new_val);
lp = LpInsert(lp, field, an.Piece(), false).first;
}
pv.SetRObjPtr(lp);
stats->listpack_bytes += lpBytes(lp);
} else {
DCHECK_EQ(enc, kEncodingStrMap2);
StringMap* sm = (StringMap*)pv.RObjPtr();
sds val = nullptr;
if (!inserted) {
val = sm->Find(field);
}
optional<string_view> sv;
if (val) {
sv.emplace(val, sdslen(val));
}
OpStatus status = IncrementValue(sv, param);
if (status != OpStatus::OK) {
return status;
}
if (holds_alternative<double>(*param)) {
double new_val = get<double>(*param);
char buf[128];
char* str = RedisReplyBuilder::FormatDouble(new_val, buf, sizeof(buf));
sm->AddOrUpdate(field, str);
} else { // integer increment
int64_t new_val = get<int64_t>(*param);
absl::AlphaNum an(new_val);
sm->AddOrUpdate(field, an.Piece());
}
param->emplace<int64_t>(new_val);
}
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
@ -227,7 +290,6 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor,
const ScanOpts& scan_op) {
using PrivateDataRef = std::tuple<StringVec&, const ScanOpts&>;
constexpr size_t HASH_TABLE_ENTRIES_FACTOR = 2; // return key/value
/* We set the max number of iterations to ten times the specified
@ -246,15 +308,15 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t
PrimeIterator it = find_res.value();
StringVec res;
uint32_t count = scan_op.limit * HASH_TABLE_ENTRIES_FACTOR;
robj* hset = it->second.AsRObj();
PrimeValue& pv = it->second;
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* lp = (uint8_t*)hset->ptr;
if (pv.Encoding() == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
uint8_t* lp_elem = lpFirst(lp);
DCHECK(lp_elem); // empty containers are not allowed.
unsigned char intbuf[LP_INTBUF_SIZE];
uint8_t intbuf[LP_INTBUF_SIZE];
// We do single pass on listpack for this operation - ignore any limits.
do {
@ -271,26 +333,24 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t
*cursor = 0;
} else {
dict* ht = (dict*)hset->ptr;
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* sm = (StringMap*)pv.RObjPtr();
long max_iterations = count * INTERATION_FACTOR;
PrivateDataRef private_data_ref(res, scan_op);
void* private_data = &private_data_ref;
// note about this lambda - don't capture here! it should be convertible to C function!
auto scanCb = [](void* private_data, const dictEntry* de) {
StringVec& res = std::get<0>(*(PrivateDataRef*)private_data);
const ScanOpts& scan_op = std::get<1>(*(PrivateDataRef*)private_data);
sds val = (sds)de->key;
auto len = sdslen(val);
if (scan_op.Matches(std::string_view(val, len))) {
auto scanCb = [&](const void* obj) {
sds val = (sds)obj;
size_t len = sdslen(val);
if (scan_op.Matches(string_view(val, len))) {
res.emplace_back(val, len);
val = (sds)de->v.val;
val += (len + 1);
val = (sds)absl::little_endian::Load64(val);
res.emplace_back(val, sdslen(val));
}
};
do {
*cursor = dictScan(ht, *cursor, scanCb, NULL, private_data);
*cursor = sm->Scan(*cursor, scanCb);
} while (*cursor && max_iterations-- && res.size() < count);
}
@ -307,38 +367,50 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, CmdArgList valu
return it_res.status();
db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res);
CompactObj& co = (*it_res)->second;
robj* hset = co.AsRObj();
PrimeValue& pv = (*it_res)->second;
unsigned deleted = 0;
bool key_remove = false;
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
unsigned enc = pv.Encoding();
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr);
}
for (auto s : values) {
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, s.data(), s.size());
if (hashTypeDelete(hset, op_args.shard->tmp_str1)) {
++deleted;
if (hashTypeLength(hset) == 0) {
key_remove = true;
break;
if (enc == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
stats->listpack_bytes -= lpBytes(lp);
for (auto s : values) {
auto res = LpDelete(lp, ToSV(s));
if (res.second) {
++deleted;
lp = res.first;
if (lpLength(lp) == 0) {
key_remove = true;
break;
}
}
}
pv.SetRObjPtr(lp);
} else {
DCHECK_EQ(enc, kEncodingStrMap2);
StringMap* sm = (StringMap*)pv.RObjPtr();
for (auto s : values) {
bool res = sm->Erase(ToSV(s));
if (res) {
++deleted;
if (sm->Size() == 0) {
key_remove = true;
break;
}
}
}
}
co.SyncRObj();
db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key);
if (key_remove) {
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
if (enc == kEncodingListPack) {
stats->listpack_blob_cnt--;
}
db_slice.Del(op_args.db_cntx.db_index, *it_res);
} else if (hset->encoding == OBJ_ENCODING_LISTPACK) {
stats->listpack_bytes += lpBytes((uint8_t*)hset->ptr);
} else if (enc == kEncodingListPack) {
stats->listpack_bytes += lpBytes((uint8_t*)pv.RObjPtr());
}
return deleted;
@ -353,56 +425,49 @@ OpResult<vector<OptStr>> OpMGet(const OpArgs& op_args, std::string_view key, Cmd
if (!it_res)
return it_res.status();
CompactObj& co = (*it_res)->second;
robj* hset = co.AsRObj();
PrimeValue& pv = (*it_res)->second;
std::vector<OptStr> result(fields.size());
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* lp = (uint8_t*)hset->ptr;
if (pv.Encoding() == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
absl::flat_hash_map<string_view, unsigned> reverse;
reverse.reserve(fields.size() + 1);
for (size_t i = 0; i < fields.size(); ++i) {
reverse.emplace(ArgS(fields, i), i); // map fields to their index.
}
char ibuf[32];
uint8_t* lp_elem = lpFirst(lp);
int64_t ele_len;
string_view key;
DCHECK(lp_elem); // empty containers are not allowed.
size_t lplen = lpLength(lp);
DCHECK(lplen > 0 && lplen % 2 == 0);
uint8_t ibuf[32];
string_view key;
uint8_t* lp_elem = lpFirst(lp);
DCHECK(lp_elem); // empty containers are not allowed.
// We do single pass on listpack for this operation.
do {
uint8_t* elem = lpGet(lp_elem, &ele_len, NULL);
if (elem) {
key = string_view{reinterpret_cast<char*>(elem), size_t(ele_len)};
} else {
char* next = absl::numbers_internal::FastIntToBuffer(ele_len, ibuf);
key = string_view{ibuf, size_t(next - ibuf)};
}
key = LpGetView(lp_elem, ibuf);
lp_elem = lpNext(lp, lp_elem); // switch to value
DCHECK(lp_elem);
auto it = reverse.find(key);
if (it != reverse.end()) {
DCHECK_LT(it->second, result.size());
result[it->second].emplace(LpGetVal(lp_elem)); // populate found items.
result[it->second].emplace(LpGetView(lp_elem, ibuf)); // populate found items.
}
lp_elem = lpNext(lp, lp_elem); // switch to the next key
} while (lp_elem);
} else {
DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding);
dict* d = (dict*)hset->ptr;
DCHECK_EQ(kEncodingStrMap2, pv.Encoding());
StringMap* sm = (StringMap*)pv.RObjPtr();
for (size_t i = 0; i < fields.size(); ++i) {
op_args.shard->tmp_str1 =
sdscpylen(op_args.shard->tmp_str1, fields[i].data(), fields[i].size());
dictEntry* de = dictFind(d, op_args.shard->tmp_str1);
if (de) {
sds val = (sds)dictGetVal(de);
sds val = sm->Find(ToSV(fields[i]));
if (val) {
result[i].emplace(val, sdslen(val));
}
}
@ -416,48 +481,60 @@ OpResult<uint32_t> OpLen(const OpArgs& op_args, string_view key) {
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (it_res) {
robj* hset = (*it_res)->second.AsRObj();
return hashTypeLength(hset);
return HMapLength((*it_res)->second);
}
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return 0;
return it_res.status();
}
OpResult<int> OpExist(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return 0;
return it_res.status();
}
const PrimeValue& pv = (*it_res)->second;
void* ptr = pv.RObjPtr();
if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
optional<string_view> res = LpFind((uint8_t*)ptr, field, intbuf);
return res.has_value();
}
DCHECK_EQ(kEncodingStrMap2, pv.Encoding());
StringMap* sm = (StringMap*)ptr;
return sm->Find(field) ? 1 : 0;
};
OpResult<string> OpGet(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH);
if (!it_res)
return it_res.status();
robj* hset = (*it_res)->second.AsRObj();
const PrimeValue& pv = (*it_res)->second;
void* ptr = pv.RObjPtr();
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, field.data(), field.size());
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char* vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
int ret = hashTypeGetFromListpack(hset, op_args.shard->tmp_str1, &vstr, &vlen, &vll);
if (ret < 0) {
if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
optional<string_view> res = LpFind((uint8_t*)ptr, field, intbuf);
if (!res) {
return OpStatus::KEY_NOTFOUND;
}
if (vstr) {
const char* src = reinterpret_cast<const char*>(vstr);
return string{src, vlen};
}
return absl::StrCat(vll);
return string(*res);
}
DCHECK_EQ(hset->encoding, OBJ_ENCODING_HT);
dictEntry* de = dictFind((dict*)hset->ptr, op_args.shard->tmp_str1);
if (!de)
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* sm = (StringMap*)ptr;
sds val = sm->Find(field);
if (!val)
return OpStatus::KEY_NOTFOUND;
sds val = (sds)dictGetVal(de);
return string(val, sdslen(val));
}
@ -470,41 +547,44 @@ OpResult<vector<string>> OpGetAll(const OpArgs& op_args, string_view key, uint8_
return it_res.status();
}
robj* hset = (*it_res)->second.AsRObj();
hashTypeIterator* hi = hashTypeInitIterator(hset);
const PrimeValue& pv = (*it_res)->second;
vector<string> res;
bool keyval = (mask == (FIELDS | VALUES));
size_t len = hashTypeLength(hset);
res.resize(keyval ? len * 2 : len);
unsigned index = 0;
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
while (hashTypeNext(hi) != C_ERR) {
if (mask & FIELDS) {
res[index++] = LpGetVal(hi->fptr);
}
if (pv.Encoding() == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
res.resize(lpLength(lp) / (keyval ? 1 : 2));
if (mask & VALUES) {
res[index++] = LpGetVal(hi->vptr);
uint8_t* fptr = lpFirst(lp);
uint8_t intbuf[LP_INTBUF_SIZE];
while (fptr) {
if (mask & FIELDS) {
res[index++] = LpGetView(fptr, intbuf);
}
fptr = lpNext(lp, fptr);
if (mask & VALUES) {
res[index++] = LpGetView(fptr, intbuf);
}
fptr = lpNext(lp, fptr);
}
} else {
while (hashTypeNext(hi) != C_ERR) {
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* sm = (StringMap*)pv.RObjPtr();
res.resize(sm->Size() * (keyval ? 2 : 1));
for (const auto& k_v : *sm) {
if (mask & FIELDS) {
sds key = (sds)dictGetKey(hi->de);
res[index++].assign(key, sdslen(key));
res[index++].assign(k_v.first, sdslen(k_v.first));
}
if (mask & VALUES) {
sds val = (sds)dictGetVal(hi->de);
res[index++].assign(val, sdslen(val));
res[index++].assign(k_v.second, sdslen(k_v.second));
}
}
}
hashTypeReleaseIterator(hi);
return res;
}
@ -518,25 +598,20 @@ OpResult<size_t> OpStrLen(const OpArgs& op_args, string_view key, string_view fi
return it_res.status();
}
robj* hset = (*it_res)->second.AsRObj();
size_t field_len = 0;
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, field.data(), field.size());
const PrimeValue& pv = (*it_res)->second;
void* ptr = pv.RObjPtr();
if (pv.Encoding() == kEncodingListPack) {
uint8_t intbuf[LP_INTBUF_SIZE];
optional<string_view> res = LpFind((uint8_t*)ptr, field, intbuf);
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char* vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
if (hashTypeGetFromListpack(hset, op_args.shard->tmp_str1, &vstr, &vlen, &vll) == 0)
field_len = vstr ? vlen : sdigits10(vll);
return field_len;
return res ? res->size() : 0;
}
DCHECK_EQ(hset->encoding, OBJ_ENCODING_HT);
DCHECK_EQ(pv.Encoding(), kEncodingStrMap2);
StringMap* sm = (StringMap*)ptr;
sds res = sm->Find(field);
dictEntry* de = dictFind((dict*)hset->ptr, op_args.shard->tmp_str1);
return de ? sdslen((sds)de->v.val) : 0;
return res ? sdslen(res) : 0;
}
OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList values,
@ -555,28 +630,29 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
uint8_t* lp = nullptr;
PrimeIterator& it = add_res.first;
PrimeValue& pv = it->second;
if (add_res.second) { // new key
lp = lpNew(0);
it->second.InitRobj(OBJ_HASH, kEncodingListPack, lp);
pv.InitRobj(OBJ_HASH, kEncodingListPack, lp);
stats->listpack_blob_cnt++;
stats->listpack_bytes += lpBytes(lp);
} else {
if (it->second.ObjType() != OBJ_HASH)
if (pv.ObjType() != OBJ_HASH)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
}
robj* hset = it->second.AsRObj();
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
lp = (uint8_t*)hset->ptr;
if (pv.Encoding() == kEncodingListPack) {
lp = (uint8_t*)pv.RObjPtr();
stats->listpack_bytes -= lpBytes(lp);
if (!IsGoodForListpack(values, lp)) {
stats->listpack_blob_cnt--;
hashTypeConvert(hset, OBJ_ENCODING_HT);
StringMap* sm = ConvertToStrMap(lp);
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
lp = nullptr;
}
}
@ -589,33 +665,22 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
tie(lp, inserted) = LpInsert(lp, ArgS(values, i), ArgS(values, i + 1), skip_if_exists);
created += inserted;
}
hset->ptr = lp;
pv.SetRObjPtr(lp);
stats->listpack_bytes += lpBytes(lp);
} else {
DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding);
dict* this_dict = (dict*)hset->ptr;
DCHECK_EQ(kEncodingStrMap2, pv.Encoding()); // Dictionary
StringMap* sm = (StringMap*)pv.RObjPtr();
bool added;
// Dictionary
for (size_t i = 0; i < values.size(); i += 2) {
sds fs = sdsnewlen(values[i].data(), values[i].size());
dictEntry* existing;
dictEntry* de = dictAddRaw(this_dict, fs, &existing);
if (de) {
++created;
} else { // already exists
sdsfree(fs);
if (skip_if_exists)
continue;
if (skip_if_exists)
added = sm->AddOrSkip(ToSV(values[i]), ToSV(values[i + 1]));
else
added = sm->AddOrUpdate(ToSV(values[i]), ToSV(values[i + 1]));
de = existing;
dictFreeVal(this_dict, existing);
}
sds vs = sdsnewlen(values[i + 1].data(), values[i + 1].size());
dictSetVal(this_dict, de, vs);
created += unsigned(added);
}
}
it->second.SyncRObj();
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
return created;
@ -657,18 +722,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
string_view field = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<int> {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH);
if (it_res) {
robj* hset = (*it_res)->second.AsRObj();
shard->tmp_str1 = sdscpylen(shard->tmp_str1, field.data(), field.size());
return hashTypeExists(hset, shard->tmp_str1);
}
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return 0;
return it_res.status();
return OpExist(t->GetOpArgs(shard), key, field);
};
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -932,12 +986,13 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
const PrimeValue& pv = it_res.value()->second;
StringVec str_vec;
if (pv.Encoding() == OBJ_ENCODING_HT) {
dict* this_dict = (dict*)pv.RObjPtr();
dictEntry* de = dictGetFairRandomKey(this_dict);
sds key = (sds)de->key;
if (pv.Encoding() == kEncodingStrMap2) {
// TODO: to create real random logic.
StringMap* string_map = (StringMap*)pv.RObjPtr();
sds key = string_map->begin()->first;
str_vec.emplace_back(key, sdslen(key));
} else if (pv.Encoding() == OBJ_ENCODING_LISTPACK) {
} else if (pv.Encoding() == kEncodingListPack) {
uint8_t* lp = (uint8_t*)pv.RObjPtr();
size_t lplen = lpLength(lp);
CHECK(lplen > 0 && lplen % 2 == 0);

View file

@ -153,7 +153,7 @@ TEST_F(HSetFamilyTest, HScan) {
vec = StrArray(resp.GetVec()[1]);
// See https://redis.io/commands/scan/ --> "The COUNT option", for why this cannot be exact
EXPECT_GT(vec.size(), 40); // This should be larger than (20 * 2) and less than about 50
EXPECT_GE(vec.size(), 40); // This should be larger than (20 * 2) and less than about 50
EXPECT_LT(vec.size(), 60);
}
@ -163,4 +163,32 @@ TEST_F(HSetFamilyTest, HScanLpMatchBug) {
EXPECT_THAT(resp, ArrLen(2));
}
TEST_F(HSetFamilyTest, HincrbyFloat) {
Run({"hincrbyfloat", "k", "a", "1.5"});
EXPECT_EQ(Run({"hget", "k", "a"}), "1.5");
Run({"hincrbyfloat", "k", "a", "1.5"});
EXPECT_EQ(Run({"hget", "k", "a"}), "3");
for (size_t i = 0; i < 500; ++i) {
Run({"hincrbyfloat", "k", absl::StrCat("v", i), "1.5"});
}
for (size_t i = 0; i < 500; ++i) {
EXPECT_EQ(Run({"hget", "k", absl::StrCat("v", i)}), "1.5");
}
}
TEST_F(HSetFamilyTest, HRandFloat) {
Run({"HSET", "k", "1", "2"});
EXPECT_EQ(Run({"hrandfield", "k"}), "1");
for (size_t i = 0; i < 500; ++i) {
Run({"hincrbyfloat", "k", absl::StrCat("v", i), "1.1"});
}
Run({"hrandfield", "k"});
}
} // namespace dfly

View file

@ -1505,7 +1505,7 @@ bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* des
}
dest->ptr = ss;
dest->encoding = OBJ_ENCODING_HT;
dest->encoding = kEncodingStrMap2;
} else {
dict* ds = dictCreate(&setDictType);