mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
Add HINCRBY, HVALS, HSETNX commands
This commit is contained in:
parent
d542c38475
commit
ff88e3117b
8 changed files with 158 additions and 28 deletions
|
@ -179,12 +179,12 @@ API 2.0
|
|||
- [X] HGET
|
||||
- [X] HMGET
|
||||
- [X] HLEN
|
||||
- [ ] HINCRBY
|
||||
- [X] HINCRBY
|
||||
- [ ] HINCRBYFLOAT
|
||||
- [X] HGETALL
|
||||
- [X] HKEYS
|
||||
- [ ] HSETNX
|
||||
- [ ] HVALS
|
||||
- [X] HSETNX
|
||||
- [X] HVALS
|
||||
- [ ] HSCAN
|
||||
- [ ] PubSub family
|
||||
- [X] PUBLISH
|
||||
|
|
|
@ -17,6 +17,7 @@ extern const char kKeyNotFoundErr[];
|
|||
extern const char kInvalidIntErr[];
|
||||
extern const char kInvalidFloatErr[];
|
||||
extern const char kUintErr[];
|
||||
extern const char kIncrOverflow[];
|
||||
extern const char kDbIndOutOfRangeErr[];
|
||||
extern const char kInvalidDbIndErr[];
|
||||
extern const char kScriptNotFound[];
|
||||
|
|
|
@ -57,6 +57,7 @@ const char kKeyNotFoundErr[] = "no such key";
|
|||
const char kInvalidIntErr[] = "value is not an integer or out of range";
|
||||
const char kInvalidFloatErr[] = "value is not a valid float";
|
||||
const char kUintErr[] = "value is out of range, must be positive";
|
||||
const char kIncrOverflow[] = "increment or decrement would overflow";
|
||||
const char kDbIndOutOfRangeErr[] = "DB index is out of range";
|
||||
const char kInvalidDbIndErr[] = "invalid DB index";
|
||||
const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL.";
|
||||
|
|
|
@ -184,6 +184,7 @@ void hashTypeCurrentFromListpack(hashTypeIterator *hi, int what,
|
|||
sds hashTypeCurrentFromHashTable(hashTypeIterator *hi, int what);
|
||||
void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr, unsigned int *vlen, long long *vll);
|
||||
sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
|
||||
int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll);
|
||||
robj *hashTypeGetValueObject(robj *o, sds field);
|
||||
int hashTypeSet(robj *o, sds field, sds value, int flags);
|
||||
robj *hashTypeDup(robj *o);
|
||||
|
|
|
@ -11,6 +11,7 @@ extern "C" {
|
|||
}
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "facade/error.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
|
@ -20,6 +21,7 @@ using namespace std;
|
|||
|
||||
namespace dfly {
|
||||
|
||||
using namespace facade;
|
||||
namespace {
|
||||
|
||||
constexpr size_t kMaxListPackLen = 1024;
|
||||
|
@ -182,7 +184,38 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
||||
LOG(DFATAL) << "TBD";
|
||||
string_view key = ArgS(args, 1);
|
||||
string_view field = ArgS(args, 2);
|
||||
string_view incrs = ArgS(args, 3);
|
||||
int64_t ival = 0;
|
||||
|
||||
if (!absl::SimpleAtoi(incrs, &ival)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
IncrByParam param{ival};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m);
|
||||
};
|
||||
|
||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
if (status == OpStatus::OK) {
|
||||
(*cntx)->SendLong(get<int64_t>(param));
|
||||
} else {
|
||||
switch (status) {
|
||||
case OpStatus::INVALID_VALUE:
|
||||
(*cntx)->SendError("hash value is not an integer");
|
||||
break;
|
||||
case OpStatus::OUT_OF_RANGE:
|
||||
(*cntx)->SendError(kIncrOverflow);
|
||||
break;
|
||||
default:
|
||||
(*cntx)->SendError(status);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -263,31 +296,36 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
|
|||
|
||||
DbSlice::InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
robj* hset = nullptr;
|
||||
uint8_t* lp = nullptr;
|
||||
|
||||
if (inserted) {
|
||||
robj* ro = createHashObject();
|
||||
it->second.ImportRObj(ro);
|
||||
hset = createHashObject();
|
||||
lp = (uint8_t*)hset->ptr;
|
||||
|
||||
it->second.ImportRObj(hset);
|
||||
stats->listpack_blob_cnt++;
|
||||
stats->listpack_bytes += lpBytes((uint8_t*)ro->ptr);
|
||||
stats->listpack_bytes += lpBytes(lp);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_HASH)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
|
||||
robj* hset = it->second.AsRObj();
|
||||
uint8_t* lp = (uint8_t*)hset->ptr;
|
||||
hset = it->second.AsRObj();
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
lp = (uint8_t*)hset->ptr;
|
||||
stats->listpack_bytes -= lpBytes(lp);
|
||||
|
||||
if (!IsGoodForListpack(values, lp)) {
|
||||
stats->listpack_blob_cnt--;
|
||||
hashTypeConvert(hset, OBJ_ENCODING_HT);
|
||||
lp = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
unsigned created = 0;
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
if (lp) {
|
||||
bool inserted;
|
||||
for (size_t i = 0; i < values.size(); i += 2) {
|
||||
tie(lp, inserted) = lpInsertElem(lp, ArgS(values, i), ArgS(values, i + 1));
|
||||
|
@ -378,7 +416,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList
|
|||
absl::flat_hash_map<string_view, unsigned> reverse;
|
||||
reverse.reserve(fields.size() + 1);
|
||||
for (size_t i = 0; i < fields.size(); ++i) {
|
||||
reverse.emplace(ArgS(fields, i), i); // map fields to their index.
|
||||
reverse.emplace(ArgS(fields, i), i); // map fields to their index.
|
||||
}
|
||||
|
||||
char ibuf[32];
|
||||
|
@ -406,7 +444,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList
|
|||
}
|
||||
|
||||
lp_elem = lpNext(lp, lp_elem); // switch to the next key
|
||||
} while(lp_elem);
|
||||
} while (lp_elem);
|
||||
} else {
|
||||
DCHECK_EQ(OBJ_ENCODING_HT, hset->encoding);
|
||||
dict* d = (dict*)hset->ptr;
|
||||
|
@ -517,6 +555,87 @@ OpResult<vector<string>> HSetFamily::OpGetAll(const OpArgs& op_args, string_view
|
|||
return res;
|
||||
}
|
||||
|
||||
OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_view field,
|
||||
IncrByParam* param) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
|
||||
|
||||
DbSlice::InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind);
|
||||
|
||||
robj* hset = nullptr;
|
||||
size_t lpb = 0;
|
||||
|
||||
if (inserted) {
|
||||
hset = createHashObject();
|
||||
it->second.ImportRObj(hset);
|
||||
stats->listpack_blob_cnt++;
|
||||
hset = it->second.AsRObj();
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_HASH)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
hset = it->second.AsRObj();
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
lpb = lpBytes((uint8_t*)hset->ptr);
|
||||
stats->listpack_bytes -= lpb;
|
||||
|
||||
if (lpb >= kMaxListPackLen) {
|
||||
stats->listpack_blob_cnt--;
|
||||
hashTypeConvert(hset, OBJ_ENCODING_HT);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsigned char* vstr = NULL;
|
||||
unsigned int vlen = UINT_MAX;
|
||||
long long old_val = 0;
|
||||
|
||||
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, field.data(), field.size());
|
||||
|
||||
int exist_res = hashTypeGetValue(hset, op_args.shard->tmp_str1, &vstr, &vlen, &old_val);
|
||||
|
||||
if (holds_alternative<double>(*param)) {
|
||||
LOG(FATAL) << "TBD";
|
||||
} else {
|
||||
if (exist_res == C_OK && vstr) {
|
||||
if (!absl::SimpleAtoi(string_view{reinterpret_cast<char*>(vstr), vlen}, &old_val)) {
|
||||
stats->listpack_bytes += lpb;
|
||||
|
||||
return OpStatus::INVALID_VALUE;
|
||||
}
|
||||
}
|
||||
int64_t incr = get<int64_t>(*param);
|
||||
if ((incr < 0 && old_val < 0 && incr < (LLONG_MIN - old_val)) ||
|
||||
(incr > 0 && old_val > 0 && incr > (LLONG_MAX - old_val))) {
|
||||
stats->listpack_bytes += lpb;
|
||||
|
||||
return OpStatus::OUT_OF_RANGE;
|
||||
}
|
||||
|
||||
int64_t new_val = old_val + incr;
|
||||
|
||||
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
char buf[32];
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(new_val, buf);
|
||||
string_view sval{buf, size_t(next - buf)};
|
||||
uint8_t* lp = (uint8_t*)hset->ptr;
|
||||
|
||||
lp = lpInsertElem(lp, field, sval).first;
|
||||
hset->ptr = lp;
|
||||
stats->listpack_bytes += lpBytes(lp);
|
||||
} else {
|
||||
sds news = sdsfromlonglong(new_val);
|
||||
hashTypeSet(hset, op_args.shard->tmp_str1, news, HASH_SET_TAKE_VALUE);
|
||||
}
|
||||
param->emplace<int64_t>(new_val);
|
||||
}
|
||||
|
||||
it->second.SyncRObj();
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&HSetFamily::x)
|
||||
|
|
|
@ -14,6 +14,7 @@ namespace dfly {
|
|||
class ConnectionContext;
|
||||
class CommandRegistry;
|
||||
using facade::OpResult;
|
||||
using facade::OpStatus;
|
||||
|
||||
class HSetFamily {
|
||||
public:
|
||||
|
@ -55,6 +56,11 @@ class HSetFamily {
|
|||
|
||||
static OpResult<std::vector<std::string>> OpGetAll(const OpArgs& op_args, std::string_view key,
|
||||
uint8_t getall_mask);
|
||||
|
||||
using IncrByParam = std::variant<double, int64_t>;
|
||||
static OpStatus OpIncrBy(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view field, IncrByParam* param);
|
||||
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -23,8 +23,7 @@ namespace dfly {
|
|||
namespace {
|
||||
|
||||
using namespace std;
|
||||
using facade::Protocol;
|
||||
using facade::SinkReplyBuilder;
|
||||
using namespace facade;
|
||||
|
||||
using CI = CommandId;
|
||||
DEFINE_VARZ(VarzQps, set_qps);
|
||||
|
@ -129,14 +128,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (int_arg <= 0 || (!is_ms && int_arg >= int64_t(kMaxExpireDeadlineSec))) {
|
||||
return builder->SendError(facade::kExpiryOutOfRange);
|
||||
return builder->SendError(kExpiryOutOfRange);
|
||||
}
|
||||
|
||||
if (!is_ms) {
|
||||
int_arg *= 1000;
|
||||
}
|
||||
if (int_arg >= int64_t(kMaxExpireDeadlineSec * 1000)) {
|
||||
return builder->SendError(facade::kExpiryOutOfRange);
|
||||
return builder->SendError(kExpiryOutOfRange);
|
||||
}
|
||||
sparams.expire_after_ms = int_arg;
|
||||
} else if (cur_arg == "NX") {
|
||||
|
@ -290,18 +289,21 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo
|
|||
DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
|
||||
switch (result.status()) {
|
||||
case OpStatus::OK:
|
||||
return builder->SendLong(result.value());
|
||||
builder->SendLong(result.value());
|
||||
break;
|
||||
case OpStatus::INVALID_VALUE:
|
||||
return builder->SendError(kInvalidIntErr);
|
||||
builder->SendError(kInvalidIntErr);
|
||||
break;
|
||||
case OpStatus::OUT_OF_RANGE:
|
||||
return builder->SendError("increment or decrement would overflow");
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return builder->SendError(kWrongTypeErr);
|
||||
builder->SendError(kIncrOverflow);
|
||||
break;
|
||||
case OpStatus::KEY_NOTFOUND: // Relevant only for MC
|
||||
return reinterpret_cast<facade::MCReplyBuilder*>(builder)->SendNotFound();
|
||||
default:;
|
||||
reinterpret_cast<MCReplyBuilder*>(builder)->SendNotFound();
|
||||
break;
|
||||
default:
|
||||
reinterpret_cast<RedisReplyBuilder*>(builder)->SendError(result.status());
|
||||
break;
|
||||
}
|
||||
__builtin_unreachable();
|
||||
}
|
||||
|
||||
void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
|
||||
|
@ -346,7 +348,7 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
|
|||
|
||||
if (unit_vals < 1) {
|
||||
ToLower(&args[0]);
|
||||
return (*cntx)->SendError(absl::StrCat(facade::kInvalidExpireTime, " in ", ArgS(args, 0)));
|
||||
return (*cntx)->SendError(absl::StrCat(kInvalidExpireTime, " in ", ArgS(args, 0)));
|
||||
}
|
||||
|
||||
SetCmd::SetParams sparams{cntx->db_index()};
|
||||
|
|
|
@ -256,8 +256,8 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) {
|
|||
uint8_t* zl = (uint8_t*)zobj_->ptr;
|
||||
uint8_t *eptr, *sptr;
|
||||
uint8_t* vstr;
|
||||
unsigned int vlen;
|
||||
long long vlong;
|
||||
unsigned int vlen = 0;
|
||||
long long vlong = 0;
|
||||
unsigned rangelen = 0;
|
||||
unsigned offset = params_.offset;
|
||||
unsigned limit = params_.limit;
|
||||
|
|
Loading…
Reference in a new issue