mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2024-12-14 11:58:02 +00:00
chore: bpop prints (#3076)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
8a0007d761
commit
82d8b02348
3 changed files with 32 additions and 10 deletions
|
@ -280,7 +280,8 @@ string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
|
|||
|
||||
OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
|
||||
BlockingResultCb func, unsigned limit_ms,
|
||||
bool* block_flag, bool* pause_flag) {
|
||||
bool* block_flag, bool* pause_flag,
|
||||
std::string* info) {
|
||||
string result_key;
|
||||
|
||||
// Fast path. If we have only a single shard, we can run opportunistically with a single hop.
|
||||
|
@ -289,10 +290,13 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
|
|||
OpResult<ShardFFResult> result;
|
||||
if (trans->GetUniqueShardCnt() == 1 && absl::GetFlag(FLAGS_singlehop_blocking)) {
|
||||
auto res = FindFirstNonEmptySingleShard(trans, req_obj_type, func);
|
||||
if (res.ok())
|
||||
if (res.ok()) {
|
||||
if (info)
|
||||
*info = "FF1S/";
|
||||
return res;
|
||||
else
|
||||
} else {
|
||||
result = res.status();
|
||||
}
|
||||
} else {
|
||||
result = FindFirstNonEmpty(trans, req_obj_type);
|
||||
}
|
||||
|
@ -307,6 +311,8 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
|
|||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(cb), true);
|
||||
if (info)
|
||||
*info = "FFMS/";
|
||||
return result_key;
|
||||
}
|
||||
|
||||
|
@ -351,7 +357,8 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
|
|||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
if (info)
|
||||
*info = "BLOCK/";
|
||||
return result_key;
|
||||
}
|
||||
|
||||
|
|
|
@ -89,7 +89,8 @@ using BlockingResultCb =
|
|||
// immediately with the first key listed in the tx arguments.
|
||||
OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
|
||||
BlockingResultCb cb, unsigned limit_ms,
|
||||
bool* block_flag, bool* pause_flag);
|
||||
bool* block_flag, bool* pause_flag,
|
||||
std::string* info = nullptr);
|
||||
|
||||
}; // namespace container_utils
|
||||
|
||||
|
|
|
@ -1281,11 +1281,19 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo
|
|||
DVLOG(2) << "popping from " << key << " " << t->DebugId();
|
||||
|
||||
PrimeValue& pv = it->second;
|
||||
CHECK_GT(pv.Size(), 0u) << key << " " << pv.GetRobjWrapper()->encoding();
|
||||
|
||||
IntervalVisitor iv{Action::POP, range_spec.params, &pv};
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
it_res->post_updater.Run();
|
||||
|
||||
auto res = iv.PopResult();
|
||||
|
||||
// We don't store empty keys
|
||||
CHECK(!res.empty()) << key << " failed to pop from type " << pv.GetRobjWrapper()->encoding()
|
||||
<< " now size is " << pv.Size();
|
||||
|
||||
auto zlen = pv.Size();
|
||||
if (zlen == 0) {
|
||||
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
|
||||
|
@ -1298,7 +1306,7 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo
|
|||
RecordJournal(op_args, command, ArgSlice{key}, 1);
|
||||
}
|
||||
|
||||
return iv.PopResult();
|
||||
return res;
|
||||
}
|
||||
|
||||
void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
|
||||
|
@ -1316,19 +1324,25 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
|
|||
|
||||
Transaction* transaction = cntx->transaction;
|
||||
|
||||
std::string dinfo;
|
||||
std::string callback_ran_key = "/NONE/";
|
||||
OpResult<ScoredArray> popped_array;
|
||||
auto cb = [is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) {
|
||||
auto cb = [is_max, &popped_array, &callback_ran_key](Transaction* t, EngineShard* shard,
|
||||
std::string_view key) {
|
||||
callback_ran_key = key;
|
||||
popped_array = OpBZPop(t, shard, key, is_max);
|
||||
};
|
||||
|
||||
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked,
|
||||
&cntx->paused);
|
||||
transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked, &cntx->paused,
|
||||
&dinfo);
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
if (popped_key) {
|
||||
DVLOG(1) << "BZPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
|
||||
CHECK(popped_array->size() == 1);
|
||||
CHECK(popped_array.ok()) << dinfo;
|
||||
CHECK_EQ(popped_array->size(), 1u)
|
||||
<< popped_key << " ran " << callback_ran_key << " info " << dinfo;
|
||||
rb->StartArray(3);
|
||||
rb->SendBulkString(*popped_key);
|
||||
rb->SendBulkString(popped_array->front().first);
|
||||
|
|
Loading…
Reference in a new issue