1
0
Fork 0
mirror of https://github.com/dragonflydb/dragonfly.git synced 2024-12-14 11:58:02 +00:00

chore: remove old io (#3953)

* chore: Remove old IO

* fix: fix last error accounting
* chore: use unique_ptr<char> in MGetResponse storage

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-11-10 12:56:41 +03:00 committed by GitHub
parent 2d49a28c15
commit eadce55b67
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 250 additions and 1262 deletions

View file

@ -6,13 +6,10 @@
#include "absl/flags/internal/flag.h"
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
ABSL_FLAG(bool, experimental_new_io, true,
"Use new replying code - should "
"reduce latencies for pipelining");
namespace facade {
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
@ -22,11 +19,11 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
if (stream) {
switch (protocol_) {
case Protocol::NONE:
LOG(DFATAL) << "Invalid protocol";
break;
case Protocol::REDIS: {
RedisReplyBuilder* rb = absl::GetFlag(FLAGS_experimental_new_io)
? new RedisReplyBuilder2(stream)
: new RedisReplyBuilder(stream);
rbuilder_.reset(rb);
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
}
case Protocol::MEMCACHE:

View file

@ -479,7 +479,7 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
}
arr[i++] = pub_msg.channel;
arr[i++] = pub_msg.message;
rbuilder->SendStringArr(absl::Span<string_view>{arr.data(), i},
rbuilder->SendBulkStrArr(absl::Span<string_view>{arr.data(), i},
RedisReplyBuilder::CollectionType::PUSH);
}
@ -518,7 +518,7 @@ void Connection::DispatchOperations::operator()(const InvalidationMessage& msg)
rbuilder->SendNull();
} else {
std::string_view keys[] = {msg.key};
rbuilder->SendStringArr(keys);
rbuilder->SendBulkStrArr(keys);
}
}
@ -550,6 +550,9 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
static_assert(kReqSz <= 256 && kReqSz >= 200);
switch (protocol) {
case Protocol::NONE:
LOG(DFATAL) << "Invalid protocol";
break;
case Protocol::REDIS:
redis_parser_.reset(new RedisParser(GetFlag(FLAGS_max_multi_bulk_len)));
break;
@ -1358,7 +1361,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) {
void Connection::SquashPipeline() {
DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_);
DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported.
DCHECK_EQ(reply_builder_->GetProtocol(), Protocol::REDIS); // Only Redis is supported.
vector<ArgSlice> squash_cmds;
squash_cmds.reserve(dispatch_q_.size());
@ -1377,7 +1380,7 @@ void Connection::SquashPipeline() {
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get());
if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
reply_builder_->FlushBatch();
reply_builder_->Flush();
reply_builder_->SetBatchMode(false); // in case the next dispatch is sync
}
@ -1498,7 +1501,7 @@ void Connection::ExecutionFiber() {
// last command to reply and flush. If it doesn't reply (i.e. is a control message like
// migrate), we have to flush manually.
if (dispatch_q_.empty() && !msg.IsReplying()) {
reply_builder_->FlushBatch();
reply_builder_->Flush();
}
if (ShouldEndDispatchFiber(msg)) {

View file

@ -165,6 +165,10 @@ ostream& operator<<(ostream& os, facade::CmdArgList ras) {
return os;
}
ostream& operator<<(ostream& os, facade::Protocol p) {
return os << int(p);
}
ostream& operator<<(ostream& os, const facade::RespExpr& e) {
using facade::RespExpr;
using facade::ToSV;

View file

@ -33,7 +33,7 @@ constexpr size_t kSanitizerOverhead = 0u;
#endif
#endif
enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 };
enum class Protocol : uint8_t { NONE = 0, MEMCACHE = 1, REDIS = 2 };
using MutableSlice = std::string_view;
using CmdArgList = absl::Span<const std::string_view>;
@ -189,5 +189,6 @@ void ResetStats();
namespace std {
ostream& operator<<(ostream& os, facade::CmdArgList args);
ostream& operator<<(ostream& os, facade::Protocol protocol);
} // namespace std

View file

@ -49,10 +49,6 @@ constexpr unsigned kConvFlags =
DoubleToStringConverter dfly_conv(kConvFlags, "inf", "nan", 'e', -6, 21, 6, 0);
const char* NullString(bool resp3) {
return resp3 ? "_\r\n" : "$-1\r\n";
}
template <typename T> size_t piece_size(const T& v) {
if constexpr (is_array_v<T>)
return ABSL_ARRAYSIZE(v) - 1; // expect null terminated
@ -77,21 +73,28 @@ char* write_piece(string_view str, char* dest) {
} // namespace
SinkReplyBuilder::MGetResponse::~MGetResponse() {
while (storage_list) {
auto* next = storage_list->next;
delete[] reinterpret_cast<char*>(storage_list);
storage_list = next;
}
SinkReplyBuilder::ReplyAggregator::~ReplyAggregator() {
rb->batched_ = prev;
if (!prev)
rb->Flush();
}
SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink, Type t)
: sink_(sink),
should_batch_(false),
should_aggregate_(false),
has_replied_(true),
send_active_(false),
type_(t) {
SinkReplyBuilder::ReplyScope::~ReplyScope() {
rb->scoped_ = prev;
if (!prev)
rb->FinishScope();
}
void SinkReplyBuilder::SendError(ErrorReply error) {
if (error.status)
return SendError(*error.status);
SendError(error.ToSv(), error.kind);
}
void SinkReplyBuilder::SendError(OpStatus status) {
if (status == OpStatus::OK)
return SendSimpleString("OK");
SendError(StatusToMsg(status));
}
void SinkReplyBuilder::CloseConnection() {
@ -99,157 +102,7 @@ void SinkReplyBuilder::CloseConnection() {
ec_ = std::make_error_code(std::errc::connection_aborted);
}
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
has_replied_ = true;
DCHECK(sink_);
constexpr size_t kMaxBatchSize = 1024;
size_t bsize = 0;
for (unsigned i = 0; i < len; ++i) {
bsize += v[i].iov_len;
}
// Allow batching with up to kMaxBatchSize of data.
if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) {
batch_.reserve(batch_.size() + bsize);
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(3) << "Appending to stream " << absl::CHexEscape(src);
batch_.append(src.data(), src.size());
}
DVLOG(2) << "Batched " << bsize << " bytes";
return;
}
int64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
error_code ec;
send_active_ = true;
tl_facade_stats->reply_stats.io_write_cnt++;
tl_facade_stats->reply_stats.io_write_bytes += bsize;
DVLOG(2) << "Writing " << bsize + batch_.size() << " bytes of len " << len;
if (batch_.empty()) {
ec = sink_->Write(v, len);
} else {
DVLOG(3) << "Sending batch to stream :" << absl::CHexEscape(batch_);
tl_facade_stats->reply_stats.io_write_bytes += batch_.size();
if (len == UIO_MAXIOV) {
ec = sink_->Write(io::Buffer(batch_));
if (!ec) {
ec = sink_->Write(v, len);
}
} else {
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
tmp[0].iov_len = batch_.size();
copy(v, v + len, tmp + 1);
ec = sink_->Write(tmp, len + 1);
}
batch_.clear();
}
send_active_ = false;
int64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
tl_facade_stats->reply_stats.send_stats.count++;
tl_facade_stats->reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000;
if (ec) {
DVLOG(1) << "Error writing to stream: " << ec.message();
ec_ = ec;
}
}
void SinkReplyBuilder::SendRaw(std::string_view raw) {
iovec v = {IoVec(raw)};
Send(&v, 1);
}
void SinkReplyBuilder::ExpectReply() {
has_replied_ = false;
}
void SinkReplyBuilder::SendError(ErrorReply error) {
if (error.status)
return SendError(*error.status);
SendError(error.ToSv(), error.kind);
}
void SinkReplyBuilder::SendError(OpStatus status) {
if (status == OpStatus::OK) {
SendOk();
} else {
SendError(StatusToMsg(status));
}
}
void SinkReplyBuilder::StartAggregate() {
DVLOG(1) << "StartAggregate";
should_aggregate_ = true;
}
void SinkReplyBuilder::StopAggregate() {
DVLOG(1) << "StopAggregate";
should_aggregate_ = false;
if (should_batch_)
return;
FlushBatch();
}
void SinkReplyBuilder::SetBatchMode(bool batch) {
DVLOG(1) << "SetBatchMode(" << (batch ? "true" : "false") << ")";
should_batch_ = batch;
}
void SinkReplyBuilder::FlushBatch() {
if (batch_.empty())
return;
error_code ec = sink_->Write(io::Buffer(batch_));
batch_.clear();
if (ec) {
DVLOG(1) << "Error flushing to stream: " << ec.message();
ec_ = ec;
}
}
size_t SinkReplyBuilder::UsedMemory() const {
return dfly::HeapSize(batch_);
}
SinkReplyBuilder2::ReplyAggregator::~ReplyAggregator() {
rb->batched_ = prev;
if (!prev)
rb->Flush();
}
SinkReplyBuilder2::ReplyScope::~ReplyScope() {
rb->scoped_ = prev;
if (!prev)
rb->FinishScope();
}
void SinkReplyBuilder2::SendError(ErrorReply error) {
if (error.status)
return SendError(*error.status);
SendError(error.ToSv(), error.kind);
}
void SinkReplyBuilder2::SendError(OpStatus status) {
if (status == OpStatus::OK)
return SendSimpleString("OK");
SendError(StatusToMsg(status));
}
void SinkReplyBuilder2::CloseConnection() {
if (!ec_)
ec_ = std::make_error_code(std::errc::connection_aborted);
}
template <typename... Ts> void SinkReplyBuilder2::WritePieces(Ts&&... pieces) {
template <typename... Ts> void SinkReplyBuilder::WritePieces(Ts&&... pieces) {
if (size_t required = (piece_size(pieces) + ...); buffer_.AppendLen() <= required)
Flush(required);
@ -268,12 +121,12 @@ template <typename... Ts> void SinkReplyBuilder2::WritePieces(Ts&&... pieces) {
total_size_ += written;
}
void SinkReplyBuilder2::WriteRef(std::string_view str) {
void SinkReplyBuilder::WriteRef(std::string_view str) {
NextVec(str);
total_size_ += str.size();
}
void SinkReplyBuilder2::Flush(size_t expected_buffer_cap) {
void SinkReplyBuilder::Flush(size_t expected_buffer_cap) {
Send();
// Grow backing buffer if was at least half full and still below it's max size
@ -290,7 +143,7 @@ void SinkReplyBuilder2::Flush(size_t expected_buffer_cap) {
buffer_.Reserve(expected_buffer_cap);
}
void SinkReplyBuilder2::Send() {
void SinkReplyBuilder::Send() {
auto& reply_stats = tl_facade_stats->reply_stats;
send_active_ = true;
@ -308,7 +161,9 @@ void SinkReplyBuilder2::Send() {
send_active_ = false;
}
void SinkReplyBuilder2::FinishScope() {
void SinkReplyBuilder::FinishScope() {
replies_recorded_++;
if (!batched_ || total_size_ * 2 >= kMaxBufferSize)
return Flush();
@ -332,22 +187,37 @@ void SinkReplyBuilder2::FinishScope() {
guaranteed_pieces_ = vecs_.size(); // all vecs are pieces
}
void SinkReplyBuilder2::NextVec(std::string_view str) {
void SinkReplyBuilder::NextVec(std::string_view str) {
if (vecs_.size() >= IOV_MAX - 2)
Flush();
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
}
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink, MC), noreply_(false) {
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
}
void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver,
uint32_t mc_flag) {
ReplyScope scope(this);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
if (mc_ver)
WritePieces(" ", mc_ver);
if (value.size() <= kMaxInlineSize) {
WritePieces(kCRLF, value, kCRLF);
} else {
WritePieces(kCRLF);
WriteRef(value);
WritePieces(kCRLF);
}
}
void MCReplyBuilder::SendSimpleString(std::string_view str) {
if (noreply_)
return;
iovec v[2] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
ReplyScope scope(this);
WritePieces(str, kCRLF);
}
void MCReplyBuilder::SendStored() {
@ -355,31 +225,11 @@ void MCReplyBuilder::SendStored() {
}
void MCReplyBuilder::SendLong(long val) {
char buf[32];
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
SendSimpleString(string_view(buf, next - buf));
}
void MCReplyBuilder::SendMGetResponse(MGetResponse resp) {
string header;
for (unsigned i = 0; i < resp.resp_arr.size(); ++i) {
if (resp.resp_arr[i]) {
const auto& src = *resp.resp_arr[i];
absl::StrAppend(&header, "VALUE ", src.key, " ", src.mc_flag, " ", src.value.size());
if (src.mc_ver) {
absl::StrAppend(&header, " ", src.mc_ver);
}
absl::StrAppend(&header, "\r\n");
iovec v[] = {IoVec(header), IoVec(src.value), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
header.clear();
}
}
SendSimpleString("END");
SendSimpleString(absl::StrCat(val));
}
void MCReplyBuilder::SendError(string_view str, std::string_view type) {
last_error_ = str;
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
}
@ -387,13 +237,8 @@ void MCReplyBuilder::SendProtocolError(std::string_view str) {
SendSimpleString(absl::StrCat("CLIENT_ERROR ", str));
}
bool MCReplyBuilder::NoReply() const {
return noreply_;
}
void MCReplyBuilder::SendClientError(string_view str) {
iovec v[] = {IoVec("CLIENT_ERROR "), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
SendSimpleString(absl::StrCat("CLIENT_ERROR ", str));
}
void MCReplyBuilder::SendSetSkipped() {
@ -404,455 +249,18 @@ void MCReplyBuilder::SendNotFound() {
SendSimpleString("NOT_FOUND");
}
MCReplyBuilder2::MCReplyBuilder2(::io::Sink* sink) : SinkReplyBuilder2(sink), noreply_(false) {
}
void MCReplyBuilder2::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver,
uint32_t mc_flag) {
void MCReplyBuilder::SendRaw(std::string_view str) {
ReplyScope scope(this);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
if (mc_ver)
WritePieces(" ", mc_ver);
if (value.size() <= kMaxInlineSize) {
WritePieces(value, kCRLF);
} else {
WriteRef(value);
WritePieces(kCRLF);
}
WriteRef(str);
}
void MCReplyBuilder2::SendSimpleString(std::string_view str) {
if (noreply_)
return;
void RedisReplyBuilderBase::SendNull() {
ReplyScope scope(this);
WritePieces(str, kCRLF);
}
void MCReplyBuilder2::SendStored() {
SendSimpleString("STORED");
}
void MCReplyBuilder2::SendLong(long val) {
SendSimpleString(absl::StrCat(val));
}
void MCReplyBuilder2::SendError(string_view str, std::string_view type) {
SendSimpleString(absl::StrCat("SERVER_ERROR ", str));
}
void MCReplyBuilder2::SendProtocolError(std::string_view str) {
SendSimpleString(absl::StrCat("CLIENT_ERROR ", str));
}
void MCReplyBuilder2::SendClientError(string_view str) {
SendSimpleString(absl::StrCat("CLIENT_ERROR", str));
}
void MCReplyBuilder2::SendSetSkipped() {
SendSimpleString("NOT_STORED");
}
void MCReplyBuilder2::SendNotFound() {
SendSimpleString("NOT_FOUND");
}
char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len) {
StringBuilder sb(dest, dest_len);
CHECK(dfly_conv.ToShortest(val, &sb));
return sb.Finalize();
}
RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink, REDIS) {
}
void RedisReplyBuilder::SetResp3(bool is_resp3) {
is_resp3_ = is_resp3;
}
void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
VLOG(1) << "Error: " << str;
if (err_type.empty()) {
err_type = str;
if (err_type == kSyntaxErr)
err_type = kSyntaxErrType;
else if (err_type == kWrongTypeErr)
err_type = kWrongTypeErrType;
else if (err_type == kScriptNotFound)
err_type = kScriptErrType;
}
tl_facade_stats->reply_stats.err_count[err_type]++;
if (str[0] == '-') {
iovec v[] = {IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
return;
}
iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendProtocolError(std::string_view str) {
SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error");
}
void RedisReplyBuilder::SendSimpleString(std::string_view str) {
iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)};
Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendStored() {
SendSimpleString("OK");
}
void RedisReplyBuilder::SendSetSkipped() {
SendNull();
}
void RedisReplyBuilder::SendNull() {
iovec v[] = {IoVec(NullString(is_resp3_))};
Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendBulkString(std::string_view str) {
char tmp[absl::numbers_internal::kFastToBufferSize + 3];
tmp[0] = '$'; // Format length
char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size()), tmp + 1);
*next++ = '\r';
*next++ = '\n';
std::string_view lenpref{tmp, size_t(next - tmp)};
// 3 parts: length, string and CRLF.
iovec v[3] = {IoVec(lenpref), IoVec(str), IoVec(kCRLF)};
return Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendVerbatimString(std::string_view str, VerbatimFormat format) {
if (!is_resp3_)
return SendBulkString(str);
char tmp[absl::numbers_internal::kFastToBufferSize + 7];
tmp[0] = '=';
// + 4 because format is three byte, and need to be followed by a ":"
char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size() + 4), tmp + 1);
*next++ = '\r';
*next++ = '\n';
DCHECK(format <= VerbatimFormat::MARKDOWN);
if (format == VerbatimFormat::TXT)
strcpy(next, "txt:");
else if (format == VerbatimFormat::MARKDOWN)
strcpy(next, "mkd:");
next += 4;
std::string_view lenpref{tmp, size_t(next - tmp)};
iovec v[3] = {IoVec(lenpref), IoVec(str), IoVec(kCRLF)};
return Send(v, ABSL_ARRAYSIZE(v));
}
void RedisReplyBuilder::SendLong(long num) {
string str = absl::StrCat(":", num, kCRLF);
SendRaw(str);
}
void RedisReplyBuilder::SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) {
ReplyAggregator agg(this);
if (!with_scores) {
auto cb = [&](size_t indx) -> string_view { return arr[indx].first; };
SendStringArrInternal(arr.size(), std::move(cb), CollectionType::ARRAY);
return;
}
char buf[DoubleToStringConverter::kBase10MaximalLength * 3]; // to be on the safe side.
if (!is_resp3_) { // RESP2 formats withscores as a flat array.
auto cb = [&](size_t indx) -> string_view {
if (indx % 2 == 0)
return arr[indx / 2].first;
// NOTE: we reuse the same buffer, assuming that SendStringArrInternal does not reference
// previous string_views. The assumption holds for small strings like
// doubles because SendStringArrInternal employs small string optimization.
// It's a bit hacky but saves allocations.
return FormatDouble(arr[indx / 2].second, buf, sizeof(buf));
};
SendStringArrInternal(arr.size() * 2, std::move(cb), CollectionType::ARRAY);
return;
}
// Resp3 formats withscores as array of (key, score) pairs.
// TODO: to implement efficient serializing by extending SendStringArrInternal to support
// 2-level arrays.
StartArray(arr.size());
for (const auto& p : arr) {
StartArray(2);
SendBulkString(p.first);
SendDouble(p.second);
}
}
void RedisReplyBuilder::SendDouble(double val) {
char buf[64];
char* start = FormatDouble(val, buf, sizeof(buf));
if (!is_resp3_) {
SendBulkString(start);
} else {
// RESP3
SendRaw(absl::StrCat(",", start, kCRLF));
}
}
void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) {
DCHECK(!resp.resp_arr.empty());
size_t size = resp.resp_arr.size();
size_t vec_len = std::min<size_t>(32, size);
constexpr size_t kBatchLen = 32 * 2 + 2; // (blob_size, blob) * 32 + 2 spares
iovec vec_batch[kBatchLen];
// for all the meta data to fill the vec batch. 10 digits for the blob size and 6 for
// $, \r, \n, \r, \n
absl::FixedArray<char, 64> meta((vec_len + 2) * 16); // 2 for header and next item meta data.
char* next = meta.data();
char* cur_meta = next;
*next++ = '*';
next = absl::numbers_internal::FastIntToBuffer(size, next);
*next++ = '\r';
*next++ = '\n';
unsigned vec_indx = 0;
const char* nullstr = NullString(is_resp3_);
size_t nulllen = strlen(nullstr);
auto get_pending_metabuf = [&] { return string_view{cur_meta, size_t(next - cur_meta)}; };
for (unsigned i = 0; i < size; ++i) {
DCHECK_GE(meta.end() - next, 16); // We have at least 16 bytes for the meta data.
if (resp.resp_arr[i]) {
string_view blob = resp.resp_arr[i]->value;
*next++ = '$';
next = absl::numbers_internal::FastIntToBuffer(blob.size(), next);
*next++ = '\r';
*next++ = '\n';
DCHECK_GT(next - cur_meta, 0);
vec_batch[vec_indx++] = IoVec(get_pending_metabuf());
vec_batch[vec_indx++] = IoVec(blob);
cur_meta = next; // we combine the CRLF with the next item meta data.
*next++ = '\r';
*next++ = '\n';
} else {
memcpy(next, nullstr, nulllen);
next += nulllen;
}
if (vec_indx >= (kBatchLen - 2) || (meta.end() - next < 16)) {
// we have space for at least one iovec because in the worst case we reached (kBatchLen - 3)
// and then filled 2 vectors in the previous iteration.
DCHECK_LE(vec_indx, kBatchLen - 1);
// if we do not have enough space in the meta buffer, we add the meta data to the
// vector batch and reset it.
if (meta.end() - next < 16) {
vec_batch[vec_indx++] = IoVec(get_pending_metabuf());
next = meta.data();
cur_meta = next;
}
Send(vec_batch, vec_indx);
if (ec_)
return;
vec_indx = 0;
size_t meta_len = next - cur_meta;
memcpy(meta.data(), cur_meta, meta_len);
cur_meta = meta.data();
next = cur_meta + meta_len;
}
}
if (next - cur_meta > 0) {
vec_batch[vec_indx++] = IoVec(get_pending_metabuf());
}
if (vec_indx > 0)
Send(vec_batch, vec_indx);
}
void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) {
string res = absl::StrCat("*", arr.Size(), kCRLF);
for (string_view str : arr)
StrAppend(&res, "+", str, kCRLF);
SendRaw(res);
}
void RedisReplyBuilder::SendNullArray() {
SendRaw("*-1\r\n");
}
void RedisReplyBuilder::SendEmptyArray() {
StartArray(0);
}
void RedisReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) {
if (type == ARRAY && arr.Size() == 0) {
SendRaw("*0\r\n");
return;
}
auto cb = [&](size_t i) {
return visit([i](auto& span) { return facade::ToSV(span[i]); }, arr.span);
};
SendStringArrInternal(arr.Size(), std::move(cb), type);
}
void RedisReplyBuilder::StartArray(unsigned len) {
StartCollection(len, ARRAY);
}
constexpr static string_view START_SYMBOLS[] = {"*", "~", "%", ">"};
static_assert(START_SYMBOLS[RedisReplyBuilder::MAP] == "%" &&
START_SYMBOLS[RedisReplyBuilder::SET] == "~");
void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) {
if (!is_resp3_) { // Flatten for Resp2
if (type == MAP)
len *= 2;
type = ARRAY;
}
DVLOG(2) << "StartCollection(" << len << ", " << type << ")";
// We do not want to send multiple packets for small responses because these
// trigger TCP-related artifacts (e.g. Nagle's algorithm) that slow down the delivery of the whole
// response.
bool prev = should_aggregate_;
should_aggregate_ |= (len > 0);
SendRaw(absl::StrCat(START_SYMBOLS[type], len, kCRLF));
should_aggregate_ = prev;
}
// This implementation a bit complicated because it uses vectorized
// send to send an array. The problem with that is the OS limits vector length to UIO_MAXIOV.
// Therefore, to make it robust we send the array in batches.
// We limit the vector length, and when it fills up we flush it to the socket and continue
// iterating.
void RedisReplyBuilder::SendStringArrInternal(
size_t size, absl::FunctionRef<std::string_view(unsigned)> producer, CollectionType type) {
size_t header_len = size;
string_view type_char = "*";
if (is_resp3_) {
type_char = START_SYMBOLS[type];
if (type == MAP)
header_len /= 2; // Each key value pair counts as one.
}
if (header_len == 0) {
SendRaw(absl::StrCat(type_char, "0\r\n"));
return;
}
// We limit iovec capacity, vectorized length is limited upto UIO_MAXIOV (Send returns EMSGSIZE).
size_t vec_cap = std::min<size_t>(UIO_MAXIOV, size * 2);
absl::FixedArray<iovec, 16> vec(vec_cap);
absl::FixedArray<char, 128> meta(std::max<size_t>(vec_cap * 64, 128u));
char* start = meta.data();
char* next = start;
// at most 35 chars.
auto serialize_len = [&](char prefix, size_t len) {
*next++ = prefix;
next = absl::numbers_internal::FastIntToBuffer(len, next); // at most 32 chars
*next++ = '\r';
*next++ = '\n';
};
serialize_len(type_char[0], header_len);
unsigned vec_indx = 0;
string_view src;
#define FLUSH_IOVEC() \
do { \
Send(vec.data(), vec_indx); \
if (ec_) \
return; \
vec_indx = 0; \
next = meta.data(); \
} while (false)
for (unsigned i = 0; i < size; ++i) {
DCHECK_LT(vec_indx, vec_cap);
src = producer(i);
serialize_len('$', src.size());
// copy data either by referencing via an iovec or copying inline into meta buf.
constexpr size_t kSSOLen = 32;
if (src.size() > kSSOLen) {
// reference metadata blob before referencing another vector.
DCHECK_GT(next - start, 0);
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
if (vec_indx >= vec_cap) {
FLUSH_IOVEC();
}
DCHECK_LT(vec_indx, vec.size());
vec[vec_indx++] = IoVec(src);
if (vec_indx >= vec_cap) {
FLUSH_IOVEC();
}
start = next;
} else if (src.size() > 0) {
// NOTE!: this is not just optimization. producer may returns a string_piece that will
// be overriden for the next call, so we must do this for correctness.
memcpy(next, src.data(), src.size());
next += src.size();
}
// how much buffer we need to perform the next iteration.
constexpr ptrdiff_t kMargin = kSSOLen + 3 /* $\r\n */ + 2 /*length*/ + 2 /* \r\n */;
// Keep at least kMargin bytes for a small string as well as its length.
if (kMargin >= meta.end() - next) {
// Flush the iovec array.
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
FLUSH_IOVEC();
start = next;
}
*next++ = '\r';
*next++ = '\n';
}
vec[vec_indx].iov_base = start;
vec[vec_indx].iov_len = next - start;
Send(vec.data(), vec_indx + 1);
}
void RedisReplyBuilder2Base::SendNull() {
ReplyScope scope(this);
has_replied_ = true;
resp3_ ? WritePieces(kNullStringR3) : WritePieces(kNullStringR2);
}
void RedisReplyBuilder2Base::SendSimpleString(std::string_view str) {
void RedisReplyBuilderBase::SendSimpleString(std::string_view str) {
ReplyScope scope(this);
has_replied_ = true;
if (str.size() <= kMaxInlineSize * 2)
return WritePieces(kSimplePref, str, kCRLF);
@ -861,9 +269,8 @@ void RedisReplyBuilder2Base::SendSimpleString(std::string_view str) {
WritePieces(kCRLF);
}
void RedisReplyBuilder2Base::SendBulkString(std::string_view str) {
void RedisReplyBuilderBase::SendBulkString(std::string_view str) {
ReplyScope scope(this);
has_replied_ = true;
if (str.size() <= kMaxInlineSize)
return WritePieces(kLengthPrefix, uint32_t(str.size()), kCRLF, str, kCRLF);
@ -872,14 +279,12 @@ void RedisReplyBuilder2Base::SendBulkString(std::string_view str) {
WritePieces(kCRLF);
}
void RedisReplyBuilder2Base::SendLong(long val) {
void RedisReplyBuilderBase::SendLong(long val) {
ReplyScope scope(this);
has_replied_ = true;
WritePieces(kLongPref, val, kCRLF);
}
void RedisReplyBuilder2Base::SendDouble(double val) {
has_replied_ = true;
void RedisReplyBuilderBase::SendDouble(double val) {
char buf[DoubleToStringConverter::kBase10MaximalLength + 8]; // +8 to be on the safe side.
static_assert(ABSL_ARRAYSIZE(buf) < kMaxInlineSize, "Write temporary string from buf inline");
string_view val_str = FormatDouble(val, buf, ABSL_ARRAYSIZE(buf));
@ -891,18 +296,16 @@ void RedisReplyBuilder2Base::SendDouble(double val) {
WritePieces(kDoublePref, val_str, kCRLF);
}
void RedisReplyBuilder2Base::SendNullArray() {
void RedisReplyBuilderBase::SendNullArray() {
ReplyScope scope(this);
has_replied_ = true;
WritePieces("*-1", kCRLF);
}
constexpr static const char START_SYMBOLS2[4][2] = {"*", "~", "%", ">"};
static_assert(START_SYMBOLS2[RedisReplyBuilder2Base::MAP][0] == '%' &&
START_SYMBOLS2[RedisReplyBuilder2Base::SET][0] == '~');
static_assert(START_SYMBOLS2[RedisReplyBuilderBase::MAP][0] == '%' &&
START_SYMBOLS2[RedisReplyBuilderBase::SET][0] == '~');
void RedisReplyBuilder2Base::StartCollection(unsigned len, CollectionType ct) {
has_replied_ = true;
void RedisReplyBuilderBase::StartCollection(unsigned len, CollectionType ct) {
if (!IsResp3()) { // RESP2 supports only arrays
if (ct == MAP)
len *= 2;
@ -912,9 +315,8 @@ void RedisReplyBuilder2Base::StartCollection(unsigned len, CollectionType ct) {
WritePieces(START_SYMBOLS2[ct], len, kCRLF);
}
void RedisReplyBuilder2Base::SendError(std::string_view str, std::string_view type) {
void RedisReplyBuilderBase::SendError(std::string_view str, std::string_view type) {
ReplyScope scope(this);
has_replied_ = true;
if (type.empty()) {
type = str;
@ -930,18 +332,17 @@ void RedisReplyBuilder2Base::SendError(std::string_view str, std::string_view ty
WritePieces(str, kCRLF);
}
void RedisReplyBuilder2Base::SendProtocolError(std::string_view str) {
void RedisReplyBuilderBase::SendProtocolError(std::string_view str) {
SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error");
}
char* RedisReplyBuilder2Base::FormatDouble(double d, char* dest, unsigned len) {
char* RedisReplyBuilderBase::FormatDouble(double d, char* dest, unsigned len) {
StringBuilder sb(dest, len);
CHECK(dfly_conv.ToShortest(d, &sb));
return sb.Finalize();
}
void RedisReplyBuilder2Base::SendVerbatimString(std::string_view str, VerbatimFormat format) {
has_replied_ = true;
void RedisReplyBuilderBase::SendVerbatimString(std::string_view str, VerbatimFormat format) {
DCHECK(format <= VerbatimFormat::MARKDOWN);
if (!IsResp3())
return SendBulkString(str);
@ -955,25 +356,25 @@ void RedisReplyBuilder2Base::SendVerbatimString(std::string_view str, VerbatimFo
WritePieces(kCRLF);
}
std::string RedisReplyBuilder2Base::SerializeCommand(std::string_view command) {
std::string RedisReplyBuilderBase::SerializeCommand(std::string_view command) {
return string{command} + kCRLF;
}
void RedisReplyBuilder2::SendSimpleStrArr2(const facade::ArgRange& strs) {
void RedisReplyBuilder::SendSimpleStrArr(const facade::ArgRange& strs) {
ReplyScope scope(this);
StartArray(strs.Size());
for (std::string_view str : strs)
SendSimpleString(str);
}
void RedisReplyBuilder2::SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct) {
void RedisReplyBuilder::SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct) {
ReplyScope scope(this);
StartCollection(ct == CollectionType::MAP ? strs.Size() / 2 : strs.Size(), ct);
for (std::string_view str : strs)
SendBulkString(str);
}
void RedisReplyBuilder2::SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
void RedisReplyBuilder::SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) {
ReplyScope scope(this);
StartArray((with_scores && !IsResp3()) ? arr.size() * 2 : arr.size());
@ -986,31 +387,20 @@ void RedisReplyBuilder2::SendScoredArray(absl::Span<const std::pair<std::string,
}
}
void RedisReplyBuilder2::SendStored() {
void RedisReplyBuilder::SendStored() {
SendSimpleString("OK");
}
void RedisReplyBuilder2::SendSetSkipped() {
void RedisReplyBuilder::SendSetSkipped() {
SendNull();
}
void RedisReplyBuilder2::StartArray(unsigned len) {
void RedisReplyBuilder::StartArray(unsigned len) {
StartCollection(len, CollectionType::ARRAY);
}
void RedisReplyBuilder2::SendEmptyArray() {
void RedisReplyBuilder::SendEmptyArray() {
StartArray(0);
}
void RedisReplyBuilder2::SendMGetResponse(SinkReplyBuilder::MGetResponse resp) {
ReplyScope scope(this);
StartArray(resp.resp_arr.size());
for (const auto& entry : resp.resp_arr) {
if (entry)
SendBulkString(entry->value);
else
SendNull();
}
}
} // namespace facade

View file

@ -21,188 +21,28 @@ enum class ReplyMode {
FULL // All replies are recorded
};
class SinkReplyBuilder {
public:
struct MGetStorage {
MGetStorage* next = nullptr;
char data[1];
};
struct GetResp {
std::string key; // TODO: to use backing storage to optimize this as well.
std::string_view value;
uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested).
uint32_t mc_flag = 0;
GetResp() = default;
GetResp(std::string_view val) : value(val) {
}
};
struct MGetResponse {
MGetStorage* storage_list = nullptr; // backing storage of resp_arr values.
std::vector<std::optional<GetResp>> resp_arr;
MGetResponse() = default;
MGetResponse(size_t size) : resp_arr(size) {
}
~MGetResponse();
MGetResponse(MGetResponse&& other) noexcept
: storage_list(other.storage_list), resp_arr(std::move(other.resp_arr)) {
other.storage_list = nullptr;
}
MGetResponse& operator=(MGetResponse&& other) noexcept {
resp_arr = std::move(other.resp_arr);
storage_list = other.storage_list;
other.storage_list = nullptr;
return *this;
}
};
SinkReplyBuilder(const SinkReplyBuilder&) = delete;
void operator=(const SinkReplyBuilder&) = delete;
enum Type { REDIS, MC };
explicit SinkReplyBuilder(::io::Sink* sink, Type t);
virtual ~SinkReplyBuilder() {
}
static MGetStorage* AllocMGetStorage(size_t size) {
static_assert(alignof(MGetStorage) == 8); // if this breaks we should fix the code below.
char* buf = new char[size + sizeof(MGetStorage)];
return new (buf) MGetStorage();
}
virtual void SendError(std::string_view str, std::string_view type = {}) = 0; // MC and Redis
virtual void SendError(OpStatus status);
void SendError(ErrorReply error);
virtual void SendStored() = 0; // Reply for set commands.
virtual void SendSetSkipped() = 0;
virtual void SendMGetResponse(MGetResponse resp) = 0;
virtual void SendLong(long val) = 0;
virtual void SendSimpleString(std::string_view str) = 0;
void SendOk() {
SendSimpleString("OK");
}
virtual void SendProtocolError(std::string_view str) = 0;
// In order to reduce interrupt rate we allow coalescing responses together using
// Batch mode. It is controlled by Connection state machine because it makes sense only
// when pipelined requests are arriving.
virtual void SetBatchMode(bool batch);
virtual void FlushBatch();
// Used for QUIT - > should move to conn_context?
virtual void CloseConnection();
virtual std::error_code GetError() const {
return ec_;
}
bool IsSendActive() const {
return send_active_; // BROKEN
}
struct ReplyAggregator {
explicit ReplyAggregator(SinkReplyBuilder* builder) : builder_(builder) {
// If the builder is already aggregating then don't aggregate again as
// this will cause redundant sink writes (such as in a MULTI/EXEC).
if (builder->should_aggregate_) {
return;
}
builder_->StartAggregate();
is_nested_ = false;
}
~ReplyAggregator() {
if (!is_nested_) {
builder_->StopAggregate();
}
}
private:
SinkReplyBuilder* builder_;
bool is_nested_ = true;
};
void ExpectReply();
bool HasReplied() const {
return has_replied_;
}
virtual size_t UsedMemory() const;
static const ReplyStats& GetThreadLocalStats() {
return tl_facade_stats->reply_stats;
}
virtual void StartAggregate();
virtual void StopAggregate();
std::string ConsumeLastError() {
return std::exchange(last_error_, std::string{});
}
Type type() const {
return type_;
}
protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
void Send(const iovec* v, uint32_t len);
std::string batch_;
::io::Sink* sink_;
std::error_code ec_;
// msg and kind/type
std::string last_error_;
bool should_batch_ : 1;
// Similarly to batch mode but is controlled by at operation level.
bool should_aggregate_ : 1;
bool has_replied_ : 1;
bool send_active_ : 1;
Type type_;
};
// Base class for all reply builders. Offer a simple high level interface for controlling output
// modes and sending basic response types.
class SinkReplyBuilder2 {
class SinkReplyBuilder {
struct GuardBase {
bool prev;
SinkReplyBuilder2* rb;
SinkReplyBuilder* rb;
};
public:
constexpr static size_t kMaxInlineSize = 32;
constexpr static size_t kMaxBufferSize = 8192;
explicit SinkReplyBuilder2(io::Sink* sink) : sink_(sink) {
explicit SinkReplyBuilder(io::Sink* sink) : sink_(sink) {
}
virtual ~SinkReplyBuilder2() = default;
virtual ~SinkReplyBuilder() = default;
// USE WITH CARE! ReplyScope assumes that all string views in Send calls keep valid for the scopes
// lifetime. This allows the builder to avoid copies by enqueueing long strings directly for
// vectorized io.
struct ReplyScope : GuardBase {
explicit ReplyScope(SinkReplyBuilder2* rb) : GuardBase{std::exchange(rb->scoped_, true), rb} {
explicit ReplyScope(SinkReplyBuilder* rb) : GuardBase{std::exchange(rb->scoped_, true), rb} {
}
~ReplyScope();
@ -211,7 +51,7 @@ class SinkReplyBuilder2 {
// Aggregator reduces the number of raw send calls by copying data in an intermediate buffer.
// Prefer ReplyScope if possible to additionally reduce the number of copies.
struct ReplyAggregator : GuardBase {
explicit ReplyAggregator(SinkReplyBuilder2* rb)
explicit ReplyAggregator(SinkReplyBuilder* rb)
: GuardBase{std::exchange(rb->batched_, true), rb} {
}
@ -228,7 +68,11 @@ class SinkReplyBuilder2 {
return buffer_.Capacity();
}
bool IsSendActive() {
size_t RepliesRecorded() const {
return replies_recorded_;
}
bool IsSendActive() const {
return send_active_;
}
@ -243,6 +87,10 @@ class SinkReplyBuilder2 {
}
public: // High level interface
virtual Protocol GetProtocol() const {
return Protocol::NONE;
}
virtual void SendLong(long val) = 0;
virtual void SendSimpleString(std::string_view str) = 0;
@ -257,6 +105,10 @@ class SinkReplyBuilder2 {
void SendError(ErrorReply error);
virtual void SendProtocolError(std::string_view str) = 0;
std::string ConsumeLastError() {
return std::exchange(last_error_, {});
}
protected:
template <typename... Ts>
void WritePieces(Ts&&... pieces); // Copy pieces into buffer and reference buffer
@ -267,11 +119,15 @@ class SinkReplyBuilder2 {
void Send();
protected:
size_t replies_recorded_ = 0;
std::string last_error_;
private:
io::Sink* sink_;
std::error_code ec_;
bool send_active_ = false;
bool send_active_ = false; // set while Send() is suspended on socket write
bool scoped_ = false, batched_ = false;
size_t total_size_ = 0; // sum of vec_ lengths
@ -285,40 +141,15 @@ class SinkReplyBuilder2 {
};
class MCReplyBuilder : public SinkReplyBuilder {
bool noreply_;
public:
MCReplyBuilder(::io::Sink* stream);
explicit MCReplyBuilder(::io::Sink* sink);
using SinkReplyBuilder::SendRaw;
~MCReplyBuilder() override = default;
void SendError(std::string_view str, std::string_view type = std::string_view{}) final;
// void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) final;
void SendMGetResponse(MGetResponse resp) final;
void SendStored() final;
void SendLong(long val) final;
void SendSetSkipped() final;
void SendClientError(std::string_view str);
void SendNotFound();
void SendSimpleString(std::string_view str) final;
void SendProtocolError(std::string_view str) final;
void SetNoreply(bool noreply) {
noreply_ = noreply;
Protocol GetProtocol() const final {
return Protocol::MEMCACHE;
}
bool NoReply() const;
};
class MCReplyBuilder2 : public SinkReplyBuilder2 {
public:
explicit MCReplyBuilder2(::io::Sink* sink);
~MCReplyBuilder2() override = default;
void SendError(std::string_view str, std::string_view type = std::string_view{}) final;
void SendStored() final;
@ -332,6 +163,8 @@ class MCReplyBuilder2 : public SinkReplyBuilder2 {
void SendSimpleString(std::string_view str) final;
void SendProtocolError(std::string_view str) final;
void SendRaw(std::string_view str);
void SetNoreply(bool noreply) {
noreply_ = noreply;
}
@ -344,161 +177,72 @@ class MCReplyBuilder2 : public SinkReplyBuilder2 {
bool noreply_ = false;
};
class RedisReplyBuilder : public SinkReplyBuilder {
// Redis reply builder interface for sending RESP data.
class RedisReplyBuilderBase : public SinkReplyBuilder {
public:
enum CollectionType { ARRAY, SET, MAP, PUSH };
enum VerbatimFormat { TXT, MARKDOWN };
using StrSpan = facade::ArgRange;
RedisReplyBuilder(::io::Sink* stream);
virtual void SetResp3(bool is_resp3);
virtual bool IsResp3() const {
return is_resp3_;
explicit RedisReplyBuilderBase(io::Sink* sink) : SinkReplyBuilder(sink) {
}
void SendError(std::string_view str, std::string_view type = {}) override;
using SinkReplyBuilder::SendError;
~RedisReplyBuilderBase() override = default;
void SendMGetResponse(MGetResponse resp) override;
void SendStored() override;
void SendSetSkipped() override;
void SendProtocolError(std::string_view str) override;
virtual void SendNullArray(); // Send *-1
virtual void SendEmptyArray(); // Send *0
virtual void SendSimpleStrArr(StrSpan arr);
virtual void SendStringArr(StrSpan arr, CollectionType type = ARRAY);
Protocol GetProtocol() const final {
return Protocol::REDIS;
}
virtual void SendNull();
void SendLong(long val) override;
virtual void SendDouble(double val);
void SendSimpleString(std::string_view str) override;
virtual void SendBulkString(std::string_view str);
virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT);
virtual void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores);
void StartArray(unsigned len); // StartCollection(len, ARRAY)
virtual void StartCollection(unsigned len, CollectionType type);
static char* FormatDouble(double val, char* dest, unsigned dest_len);
private:
void SendStringArrInternal(size_t size, absl::FunctionRef<std::string_view(unsigned)> producer,
CollectionType type);
bool is_resp3_ = false;
};
// Redis reply builder interface for sending RESP data.
class RedisReplyBuilder2Base : public SinkReplyBuilder2, public RedisReplyBuilder {
public:
using CollectionType = RedisReplyBuilder::CollectionType;
using VerbatimFormat = RedisReplyBuilder::VerbatimFormat;
explicit RedisReplyBuilder2Base(io::Sink* sink)
: SinkReplyBuilder2(sink), RedisReplyBuilder(nullptr) {
}
~RedisReplyBuilder2Base() override = default;
void SendNull() override;
void SendSimpleString(std::string_view str) override;
void SendBulkString(std::string_view str) override; // RESP: Blob String
virtual void SendBulkString(std::string_view str); // RESP: Blob String
void SendLong(long val) override;
void SendDouble(double val) override; // RESP: Number
virtual void SendDouble(double val); // RESP: Number
void SendNullArray() override;
void StartCollection(unsigned len, CollectionType ct) override;
virtual void SendNullArray();
virtual void StartCollection(unsigned len, CollectionType ct);
using SinkReplyBuilder2::SendError;
using SinkReplyBuilder::SendError;
void SendError(std::string_view str, std::string_view type = {}) override;
void SendProtocolError(std::string_view str) override;
virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT) override;
virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT);
static char* FormatDouble(double d, char* dest, unsigned len);
static std::string SerializeCommand(std::string_view command);
bool IsResp3() const override {
bool IsResp3() const {
return resp3_;
}
// REMOVE THIS override
void SetResp3(bool resp3) override {
void SetResp3(bool resp3) {
resp3_ = resp3;
}
// REMOVE THIS
void SetBatchMode(bool mode) override {
SinkReplyBuilder2::SetBatchMode(mode);
}
void StartAggregate() override {
aggregators_.emplace_back(this);
}
void StopAggregate() override {
aggregators_.pop_back();
}
void FlushBatch() override {
SinkReplyBuilder2::Flush();
}
// REMOVE THIS
void CloseConnection() override {
SinkReplyBuilder2::CloseConnection();
}
std::error_code GetError() const override {
return SinkReplyBuilder2::GetError();
}
private:
std::vector<SinkReplyBuilder2::ReplyAggregator> aggregators_;
bool resp3_ = false;
};
// Non essential rediss reply builder functions implemented on top of the base resp protocol
class RedisReplyBuilder2 : public RedisReplyBuilder2Base {
// Non essential redis reply builder functions implemented on top of the base resp protocol
class RedisReplyBuilder : public RedisReplyBuilderBase {
public:
RedisReplyBuilder2(io::Sink* sink) : RedisReplyBuilder2Base(sink) {
using RedisReplyBuilderBase::CollectionType;
RedisReplyBuilder(io::Sink* sink) : RedisReplyBuilderBase(sink) {
}
~RedisReplyBuilder2() override = default;
void SendSimpleStrArr2(const facade::ArgRange& strs);
~RedisReplyBuilder() override = default;
void SendSimpleStrArr(const facade::ArgRange& strs);
void SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct = ARRAY);
void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) override;
void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) override {
SendSimpleStrArr2(arr);
}
void SendStringArr(RedisReplyBuilder::StrSpan arr, CollectionType type = ARRAY) override {
SendBulkStrArr(arr, type);
}
void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr, bool with_scores);
void SendStored() final;
void SendSetSkipped() final;
void StartArray(unsigned len);
void SendEmptyArray() override;
// TODO: Remove
void SendMGetResponse(SinkReplyBuilder::MGetResponse resp) override;
void SendEmptyArray();
};
} // namespace facade

View file

@ -83,7 +83,7 @@ class RedisReplyBuilderTest : public testing::Test {
void SetUp() {
sink_.Clear();
builder_.reset(new RedisReplyBuilder2(&sink_));
builder_.reset(new RedisReplyBuilder(&sink_));
ResetStats();
}
@ -112,7 +112,7 @@ class RedisReplyBuilderTest : public testing::Test {
}
unsigned GetError(string_view err) const {
const auto& map = SinkReplyBuilder2::GetThreadLocalStats().err_count;
const auto& map = SinkReplyBuilder::GetThreadLocalStats().err_count;
auto it = map.find(err);
return it == map.end() ? 0 : it->second;
}
@ -135,7 +135,7 @@ class RedisReplyBuilderTest : public testing::Test {
ParsingResults Parse();
io::StringSink sink_;
std::unique_ptr<RedisReplyBuilder2> builder_;
std::unique_ptr<RedisReplyBuilder> builder_;
std::unique_ptr<std::uint8_t[]> parser_buffer_;
};
@ -205,7 +205,7 @@ RedisReplyBuilderTest::ParsingResults RedisReplyBuilderTest::Parse() {
TEST_F(RedisReplyBuilderTest, MessageSend) {
// Test each message that is "sent" to the sink
builder_->SinkReplyBuilder2::SendOk();
builder_->SinkReplyBuilder::SendOk();
ASSERT_EQ(TakePayload(), kOKMessage);
builder_->StartArray(10);
@ -783,7 +783,7 @@ TEST_F(RedisReplyBuilderTest, BasicCapture) {
string_view kTestSws[] = {"a1"sv, "a2"sv, "a3"sv, "a4"sv};
CapturingReplyBuilder crb{};
using RRB = RedisReplyBuilder2;
using RRB = RedisReplyBuilder;
auto big_arr_cb = [](RRB* r) {
r->StartArray(4);
@ -870,12 +870,12 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) {
std::string str = "A simple string!";
builder_->SetResp3(true);
builder_->SendVerbatimString(str, RedisReplyBuilder2::VerbatimFormat::TXT);
builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::TXT);
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "=20\r\ntxt:A simple string!\r\n") << "Resp3 VerbatimString TXT failed.";
builder_->SetResp3(true);
builder_->SendVerbatimString(str, RedisReplyBuilder2::VerbatimFormat::MARKDOWN);
builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::MARKDOWN);
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "=20\r\nmkd:A simple string!\r\n") << "Resp3 VerbatimString TXT failed.";

View file

@ -8,7 +8,7 @@
#include "reply_capture.h"
#define SKIP_LESS(needed) \
has_replied_ = true; \
replies_recorded_++; \
if (reply_mode_ < needed) { \
current_ = monostate{}; \
return; \
@ -23,44 +23,11 @@ void CapturingReplyBuilder::SendError(std::string_view str, std::string_view typ
Capture(Error{str, type});
}
void CapturingReplyBuilder::SendMGetResponse(MGetResponse resp) {
SKIP_LESS(ReplyMode::FULL);
Capture(std::move(resp));
}
void CapturingReplyBuilder::SendError(OpStatus status) {
if (status != OpStatus::OK) {
last_error_ = StatusToMsg(status);
}
SKIP_LESS(ReplyMode::ONLY_ERR);
Capture(status);
}
void CapturingReplyBuilder::SendNullArray() {
SKIP_LESS(ReplyMode::FULL);
Capture(unique_ptr<CollectionPayload>{nullptr});
}
void CapturingReplyBuilder::SendEmptyArray() {
SKIP_LESS(ReplyMode::FULL);
Capture(make_unique<CollectionPayload>(0, ARRAY));
}
void CapturingReplyBuilder::SendSimpleStrArr(StrSpan arr) {
SKIP_LESS(ReplyMode::FULL);
DCHECK_EQ(current_.index(), 0u);
Capture(StrArrPayload{true, ARRAY, {arr.begin(), arr.end()}});
}
void CapturingReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) {
SKIP_LESS(ReplyMode::FULL);
DCHECK_EQ(current_.index(), 0u);
// TODO: 1. Allocate all strings at once 2. Allow movable types
Capture(StrArrPayload{false, type, {arr.begin(), arr.end()}});
}
void CapturingReplyBuilder::SendNull() {
SKIP_LESS(ReplyMode::FULL);
Capture(nullptr_t{});
@ -86,13 +53,6 @@ void CapturingReplyBuilder::SendBulkString(std::string_view str) {
Capture(BulkString{string{str}});
}
void CapturingReplyBuilder::SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) {
SKIP_LESS(ReplyMode::FULL);
std::vector<std::pair<std::string, double>> values(arr.begin(), arr.end());
Capture(ScoredArray{std::move(values), with_scores});
}
void CapturingReplyBuilder::StartCollection(unsigned len, CollectionType type) {
SKIP_LESS(ReplyMode::FULL);
stack_.emplace(make_unique<CollectionPayload>(len, type), type == MAP ? len * 2 : len);
@ -109,8 +69,8 @@ CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() {
}
void CapturingReplyBuilder::SendDirect(Payload&& val) {
has_replied_ = !holds_alternative<monostate>(val);
bool is_err = holds_alternative<Error>(val) || holds_alternative<OpStatus>(val);
replies_recorded_ += !holds_alternative<monostate>(val);
bool is_err = holds_alternative<Error>(val);
ReplyMode min_mode = is_err ? ReplyMode::ONLY_ERR : ReplyMode::FULL;
if (reply_mode_ >= min_mode) {
DCHECK_EQ(current_.index(), 0u);
@ -178,13 +138,6 @@ struct CaptureVisitor {
rb->SendError(status);
}
void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
if (sa.simple)
rb->SendSimpleStrArr(sa.arr);
else
rb->SendStringArr(sa.arr, sa.type);
}
void operator()(const unique_ptr<CapturingReplyBuilder::CollectionPayload>& cp) {
if (!cp) {
rb->SendNullArray();
@ -199,14 +152,6 @@ struct CaptureVisitor {
visit(*this, std::move(pl));
}
void operator()(SinkReplyBuilder::MGetResponse resp) {
rb->SendMGetResponse(std::move(resp));
}
void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
rb->SendScoredArray(sarr.arr, sarr.with_scores);
}
RedisReplyBuilder* rb;
};
@ -219,7 +164,7 @@ void CapturingReplyBuilder::Apply(Payload&& pl, RedisReplyBuilder* rb) {
CaptureVisitor cv{rb};
visit(cv, std::move(pl));
// Consumed and printed by InvokeCmd. We just send the actual error here
std::ignore = rb->ConsumeLastError();
rb->ConsumeLastError();
}
void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) {

View file

@ -23,47 +23,23 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
friend struct CaptureVisitor;
public:
void SendError(std::string_view str, std::string_view type = {}) override;
void SendMGetResponse(MGetResponse resp) override;
// SendStored -> SendSimpleString("OK")
// SendSetSkipped -> SendNull()
void SendError(OpStatus status) override;
using RedisReplyBuilder::SendError;
void SendError(std::string_view str, std::string_view type) override;
void SendNullArray() override;
void SendEmptyArray() override;
void SendSimpleStrArr(StrSpan arr) override;
void SendStringArr(StrSpan arr, CollectionType type = ARRAY) override;
void SendNull() override;
void SendLong(long val) override;
void SendDouble(double val) override;
void SendSimpleString(std::string_view str) override;
void SendBulkString(std::string_view str) override;
void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) override;
void StartCollection(unsigned len, CollectionType type) override;
void SendNullArray() override;
void SendNull() override;
public:
using Error = std::pair<std::string, std::string>; // SendError (msg, type)
using Null = std::nullptr_t; // SendNull or SendNullArray
struct StrArrPayload {
bool simple;
CollectionType type;
std::vector<std::string> arr;
};
struct CollectionPayload;
struct ScoredArray {
std::vector<std::pair<std::string, double>> arr;
bool with_scores;
};
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString
@ -71,9 +47,8 @@ class CapturingReplyBuilder : public RedisReplyBuilder {
: RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} {
}
using Payload =
std::variant<std::monostate, Null, Error, OpStatus, long, double, SimpleString, BulkString,
StrArrPayload, std::unique_ptr<CollectionPayload>, MGetResponse, ScoredArray>;
using Payload = std::variant<std::monostate, Null, Error, long, double, SimpleString, BulkString,
std::unique_ptr<CollectionPayload>>;
// Non owned Error based on SendError arguments (msg, type)
using ErrorRef = std::pair<std::string_view, std::string_view>;

View file

@ -842,9 +842,9 @@ void DebugCmd::Watched(facade::SinkReplyBuilder* builder) {
shard_set->RunBlockingInParallel(cb);
rb->StartArray(4);
rb->SendBulkString("awaked");
rb->SendStringArr(awaked_trans);
rb->SendBulkStrArr(awaked_trans);
rb->SendBulkString("watched");
rb->SendStringArr(watched_keys);
rb->SendBulkStrArr(watched_keys);
}
void DebugCmd::TxAnalysis(facade::SinkReplyBuilder* builder) {

View file

@ -978,7 +978,7 @@ void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
VLOG(1) << "Del " << ArgS(args, 0);
atomic_uint32_t result{0};
bool is_mc = (builder->type() == SinkReplyBuilder::MC);
bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE);
auto cb = [&result](const Transaction* t, EngineShard* shard) {
ShardArgs args = t->GetShardArgs(shard->shard_id());
@ -1025,7 +1025,7 @@ void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
}
string_view resp[2] = {"pong", msg};
return rb->SendStringArr(resp);
return rb->SendBulkStrArr(resp);
}
if (args.size() == 0) {

View file

@ -720,7 +720,7 @@ void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkRepl
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) {
bool is_map = (getall_mask == (VALUES | FIELDS));
rb->SendStringArr(absl::Span<const string>{*result},
rb->SendBulkStrArr(absl::Span<const string>{*result},
is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY);
} else {
builder->SendError(result.status());
@ -1220,7 +1220,7 @@ void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder*
if ((result->size() == 1) && (args.size() == 1))
rb->SendBulkString(result->front());
else
rb->SendStringArr(*result, facade::RedisReplyBuilder::ARRAY);
rb->SendBulkStrArr(*result, facade::RedisReplyBuilder::ARRAY);
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
if (args.size() == 1)
rb->SendNull();

View file

@ -131,16 +131,6 @@ struct CaptureVisitor {
absl::StrAppend(&str, "\"", facade::StatusToMsg(status), "\"");
}
void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
absl::StrAppend(&str, "[");
for (const auto& val : sa.arr) {
absl::StrAppend(&str, JsonEscape(val), ",");
}
if (sa.arr.size())
str.pop_back();
absl::StrAppend(&str, "]");
}
void operator()(unique_ptr<CapturingReplyBuilder::CollectionPayload> cp) {
if (!cp) {
absl::StrAppend(&str, "null");
@ -157,32 +147,6 @@ struct CaptureVisitor {
}
}
void operator()(facade::SinkReplyBuilder::MGetResponse resp) {
absl::StrAppend(&str, "[");
for (const auto& val : resp.resp_arr) {
if (val) {
absl::StrAppend(&str, JsonEscape(val->value), ",");
} else {
absl::StrAppend(&str, "null,");
}
}
if (resp.resp_arr.size())
str.pop_back();
absl::StrAppend(&str, "]");
}
void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
absl::StrAppend(&str, "[");
for (const auto& [key, score] : sarr.arr) {
absl::StrAppend(&str, "{", JsonEscape(key), ":", score, "},");
}
if (sarr.arr.size() > 0) {
str.pop_back();
}
absl::StrAppend(&str, "]");
}
string str;
};

View file

@ -204,7 +204,7 @@ template <typename I> void Send(I begin, I end, RedisReplyBuilder* rb) {
rb->SendEmptyArray();
} else {
if constexpr (is_same_v<decltype(*begin), const string>) {
rb->SendStringArr(facade::OwnedArgSlice{begin, end});
rb->SendBulkStrArr(facade::OwnedArgSlice{begin, end});
} else {
rb->StartArray(end - begin);
for (auto i = begin; i != end; ++i) {

View file

@ -987,7 +987,7 @@ void BPopGeneric(ListDir dir, CmdArgList args, Transaction* tx, SinkReplyBuilder
if (popped_key) {
DVLOG(1) << "BPop " << tx->DebugId() << " popped from key " << popped_key; // key.
std::string_view str_arr[2] = {*popped_key, popped_value};
return rb->SendStringArr(str_arr);
return rb->SendBulkStrArr(str_arr);
}
DVLOG(1) << "result for " << tx->DebugId() << " is " << popped_key.status();
@ -1202,7 +1202,7 @@ void ListFamily::LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
}
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendStringArr(*res);
rb->SendBulkStrArr(*res);
}
// lrem key 5 foo, will remove foo elements from the list if exists at most 5 times.

View file

@ -286,24 +286,16 @@ class InterpreterReplier : public RedisReplyBuilder {
}
void SendError(std::string_view str, std::string_view type = std::string_view{}) final;
void SendStored() final;
void SendBulkString(std::string_view str) final;
void SendSimpleString(std::string_view str) final;
void SendMGetResponse(MGetResponse resp) final;
void SendSimpleStrArr(StrSpan arr) final;
void SendNullArray() final;
void SendStringArr(StrSpan arr, CollectionType type) final;
void SendNull() final;
void SendLong(long val) final;
void SendDouble(double val) final;
void SendBulkString(std::string_view str) final;
void StartCollection(unsigned len, CollectionType type) final;
void SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) final;
private:
void PostItem();
@ -398,11 +390,6 @@ void InterpreterReplier::SendError(string_view str, std::string_view type) {
explr_->OnError(str);
}
void InterpreterReplier::SendStored() {
DCHECK(array_len_.empty());
SendSimpleString("OK");
}
void InterpreterReplier::SendSimpleString(string_view str) {
if (array_len_.empty())
explr_->OnStatus(str);
@ -411,40 +398,11 @@ void InterpreterReplier::SendSimpleString(string_view str) {
PostItem();
}
void InterpreterReplier::SendMGetResponse(MGetResponse resp) {
DCHECK(array_len_.empty());
explr_->OnArrayStart(resp.resp_arr.size());
for (uint32_t i = 0; i < resp.resp_arr.size(); ++i) {
if (resp.resp_arr[i].has_value()) {
explr_->OnString(resp.resp_arr[i]->value);
} else {
explr_->OnNil();
}
}
explr_->OnArrayEnd();
}
void InterpreterReplier::SendSimpleStrArr(StrSpan arr) {
explr_->OnArrayStart(arr.Size());
for (string_view str : arr)
explr_->OnString(str);
explr_->OnArrayEnd();
}
void InterpreterReplier::SendNullArray() {
SendSimpleStrArr(ArgSlice{});
PostItem();
}
void InterpreterReplier::SendStringArr(StrSpan arr, CollectionType) {
explr_->OnArrayStart(arr.Size());
for (string_view str : arr)
explr_->OnString(str);
explr_->OnArrayEnd();
PostItem();
}
void InterpreterReplier::SendNull() {
explr_->OnNil();
PostItem();
@ -465,7 +423,9 @@ void InterpreterReplier::SendBulkString(string_view str) {
PostItem();
}
void InterpreterReplier::StartCollection(unsigned len, CollectionType) {
void InterpreterReplier::StartCollection(unsigned len, CollectionType type) {
if (type == MAP)
len *= 2;
explr_->OnArrayStart(len);
if (len == 0) {
@ -477,31 +437,6 @@ void InterpreterReplier::StartCollection(unsigned len, CollectionType) {
}
}
void InterpreterReplier::SendScoredArray(absl::Span<const std::pair<std::string, double>> arr,
bool with_scores) {
if (with_scores) {
if (IsResp3()) {
StartCollection(arr.size(), CollectionType::ARRAY);
for (size_t i = 0; i < arr.size(); ++i) {
StartArray(2);
SendBulkString(arr[i].first);
SendDouble(arr[i].second);
}
} else {
StartCollection(arr.size() * 2, CollectionType::ARRAY);
for (size_t i = 0; i < arr.size(); ++i) {
SendBulkString(arr[i].first);
SendDouble(arr[i].second);
}
}
} else {
StartCollection(arr.size(), CollectionType::ARRAY);
for (size_t i = 0; i < arr.size(); ++i) {
SendBulkString(arr[i].first);
}
}
}
bool IsSHA(string_view str) {
for (auto c : str) {
if (!absl::ascii_isxdigit(c))
@ -1189,10 +1124,10 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder,
facade::ConnectionContext* cntx) {
absl::Cleanup clear_last_error([builder]() { std::ignore = builder->ConsumeLastError(); });
DCHECK(!args.empty());
DCHECK_NE(0u, shard_set->size()) << "Init was not called";
absl::Cleanup clear_last_error([builder]() { builder->ConsumeLastError(); });
ServerState& etl = *ServerState::tlocal();
string cmd = absl::AsciiStrToUpper(args[0]);
@ -1305,23 +1240,26 @@ class ReplyGuard {
const bool is_script = bool(cntx->conn_state.script_info);
const bool is_one_of =
absl::flat_hash_set<std::string_view>({"REPLCONF", "DFLY"}).contains(cid_name);
bool is_mcache = builder->type() == SinkReplyBuilder::MC;
bool is_mcache = builder->GetProtocol() == Protocol::MEMCACHE;
const bool is_no_reply_memcache =
is_mcache && (static_cast<MCReplyBuilder*>(builder)->NoReply() || cid_name == "QUIT");
const bool should_dcheck = !is_one_of && !is_script && !is_no_reply_memcache;
if (should_dcheck) {
builder_ = builder;
builder_->ExpectReply();
replies_recorded_ = builder_->RepliesRecorded();
}
}
~ReplyGuard() {
if (builder_) {
DCHECK(builder_->HasReplied());
DCHECK_GT(builder_->RepliesRecorded(), replies_recorded_)
<< cid_name_ << " " << typeid(*builder_).name();
}
}
private:
size_t replies_recorded_ = 0;
std::string_view cid_name_;
SinkReplyBuilder* builder_ = nullptr;
};
@ -1359,7 +1297,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
return true;
}
builder->SendError(std::move(*err));
std::ignore = builder->ConsumeLastError();
builder->ConsumeLastError();
return true; // return false only for internal error aborts
}
@ -1402,9 +1340,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
return false;
}
std::string reason = builder->ConsumeLastError();
if (!reason.empty()) {
if (std::string reason = builder->ConsumeLastError(); !reason.empty()) {
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason);
}
@ -1450,7 +1386,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
facade::ConnectionContext* cntx) {
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning());
DCHECK_EQ(builder->type(), SinkReplyBuilder::REDIS);
DCHECK_EQ(builder->GetProtocol(), Protocol::REDIS);
vector<StoredCmd> stored_cmds;
intrusive_ptr<Transaction> dist_trans;
@ -1701,7 +1637,7 @@ absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
void Service::Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (builder->type() == SinkReplyBuilder::REDIS)
if (builder->GetProtocol() == Protocol::REDIS)
builder->SendOk();
using facade::SinkReplyBuilder;
builder->CloseConnection();
@ -2360,7 +2296,7 @@ void Service::Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
void Service::PubsubChannels(string_view pattern, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendStringArr(ServerState::tlocal()->channel_store()->ListChannels(pattern));
rb->SendBulkStrArr(ServerState::tlocal()->channel_store()->ListChannels(pattern));
}
void Service::PubsubPatterns(SinkReplyBuilder* builder) {

View file

@ -121,7 +121,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor
if (verify_commands_) {
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
rb->SendError(std::move(*err));
std::ignore = rb->ConsumeLastError();
rb->ConsumeLastError();
return !error_abort_;
}
}

View file

@ -384,7 +384,7 @@ bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types)
}
error_code ProtocolClient::SendCommand(string_view command) {
string formatted_command = RedisReplyBuilder2Base::SerializeCommand(command);
string formatted_command = RedisReplyBuilderBase::SerializeCommand(command);
auto ec = sock_->Write(io::Buffer(formatted_command));
if (!ec)
TouchIoTime();

View file

@ -712,7 +712,7 @@ void SearchFamily::FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
return OpStatus::OK;
});
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendStringArr(names);
rb->SendBulkStrArr(names);
}
void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
@ -920,7 +920,7 @@ void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder*
vector<string> vec(result_set.begin(), result_set.end());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendStringArr(vec, RedisReplyBuilder::SET);
rb->SendBulkStrArr(vec, RedisReplyBuilder::SET);
}
void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {

View file

@ -1968,7 +1968,7 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
}
}
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendStringArr(res, RedisReplyBuilder::MAP);
return rb->SendBulkStrArr(res, RedisReplyBuilder::MAP);
}
if (sub_cmd == "RESETSTAT") {

View file

@ -969,7 +969,7 @@ struct SetReplies {
if (script) // output is sorted under scripts
sort(sv->begin(), sv->end());
rb->SendStringArr(*sv, RedisReplyBuilder::SET);
rb->SendBulkStrArr(*sv, RedisReplyBuilder::SET);
}
void Send(const ResultSetView& rsv) {
@ -1122,7 +1122,7 @@ void SPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
rb->SendBulkString(result.value().front());
}
} else { // SPOP key cnt
rb->SendStringArr(*result, RedisReplyBuilder::SET);
rb->SendBulkStrArr(*result, RedisReplyBuilder::SET);
}
return;
}
@ -1240,7 +1240,7 @@ void SRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result || result == OpStatus::KEY_NOTFOUND) {
if (is_count) {
rb->SendStringArr(*result, RedisReplyBuilder::SET);
rb->SendBulkStrArr(*result, RedisReplyBuilder::SET);
} else if (result->size()) {
rb->SendBulkString(result->front());
} else {

View file

@ -518,17 +518,32 @@ OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view
return array<int64_t, 5>{limited ? 1 : 0, limit, remaining, retry_after_ms, reset_after_ms};
}
struct GetResp {
string key; // TODO: to use backing storage to optimize this as well.
string_view value;
uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested).
uint32_t mc_flag = 0;
};
struct MGetResponse {
explicit MGetResponse(size_t size = 0) : resp_arr(size) {
}
std::unique_ptr<char[]> storage;
absl::InlinedVector<std::optional<GetResp>, 2> resp_arr;
};
// fetch_mask values
constexpr uint8_t FETCH_MCFLAG = 0x1;
constexpr uint8_t FETCH_MCVER = 0x2;
SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask,
const Transaction* t, EngineShard* shard) {
MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Transaction* t,
EngineShard* shard) {
ShardArgs keys = t->GetShardArgs(shard->shard_id());
DCHECK(!keys.Empty());
auto& db_slice = t->GetDbSlice(shard->shard_id());
SinkReplyBuilder::MGetResponse response(keys.Size());
MGetResponse response(keys.Size());
absl::InlinedVector<DbSlice::ConstIterator, 32> iters(keys.Size());
// First, fetch all iterators and count total size ahead
@ -543,8 +558,8 @@ SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_
}
// Allocate enough for all values
response.storage_list = SinkReplyBuilder::AllocMGetStorage(total_size);
char* next = response.storage_list->data;
response.storage = make_unique<char[]>(total_size);
char* next = response.storage.get();
bool fetch_mcflag = fetch_mask & FETCH_MCFLAG;
bool fetch_mcver = fetch_mask & FETCH_MCVER;
for (size_t i = 0; i < iters.size(); ++i) {
@ -650,7 +665,7 @@ void ExtendGeneric(CmdArgList args, bool prepend, Transaction* tx, SinkReplyBuil
string_view value = ArgS(args, 1);
VLOG(2) << "ExtendGeneric(" << key << ", " << value << ")";
if (builder->type() == SinkReplyBuilder::REDIS) {
if (builder->GetProtocol() == Protocol::REDIS) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExtend(t->GetOpArgs(shard), key, value, prepend);
};
@ -662,7 +677,7 @@ void ExtendGeneric(CmdArgList args, bool prepend, Transaction* tx, SinkReplyBuil
rb->SendLong(GetResult(std::move(res.value())));
} else {
// Memcached skips if key is missing
DCHECK(builder->type() == SinkReplyBuilder::MC);
DCHECK(dynamic_cast<MCReplyBuilder*>(builder));
auto cb = [&](Transaction* t, EngineShard* shard) {
return ExtendOrSkip(t->GetOpArgs(shard), key, value, prepend);
@ -723,7 +738,7 @@ void SetExGeneric(bool seconds, CmdArgList args, const CommandId* cid, Transacti
}
void IncrByGeneric(string_view key, int64_t val, Transaction* tx, SinkReplyBuilder* builder) {
bool skip_on_missing = builder->type() == SinkReplyBuilder::MC;
bool skip_on_missing = (builder->GetProtocol() == Protocol::MEMCACHE);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<int64_t> res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing);
@ -974,7 +989,7 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
// Remove existed key if the key is expired already
if (rel_ms < 0) {
tx->ScheduleSingleHop([key](const Transaction* tx, EngineShard* es) {
tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) {
ShardArgs args = tx->GetShardArgs(es->shard_id());
GenericFamily::OpDel(tx->GetOpArgs(es), args);
return OpStatus::OK;
@ -1253,10 +1268,9 @@ void StringFamily::DecrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
DCHECK_GE(args.size(), 1U);
std::vector<SinkReplyBuilder::MGetResponse> mget_resp(shard_set->size());
uint8_t fetch_mask = 0;
if (builder->type() == SinkReplyBuilder::MC) {
if (builder->GetProtocol() == Protocol::MEMCACHE) {
fetch_mask |= FETCH_MCFLAG;
if (cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER)
fetch_mask |= FETCH_MCVER;
@ -1264,6 +1278,7 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
// Count of pending tiered reads
util::fb2::BlockingCounter tiering_bc{0};
std::vector<MGetResponse> mget_resp(shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mask, t, shard);
return OpStatus::OK;
@ -1275,18 +1290,14 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
// wait for all tiered reads to finish
tiering_bc->Wait();
// reorder the responses back according to the order of their corresponding
// keys.
SinkReplyBuilder::MGetResponse res(args.size());
// reorder shard results back according to argument order
absl::FixedArray<optional<GetResp>, 8> res(args.size());
for (ShardId sid = 0; sid < mget_resp.size(); ++sid) {
if (!tx->IsActive(sid))
continue;
SinkReplyBuilder::MGetResponse& src = mget_resp[sid];
src.storage_list->next = res.storage_list;
res.storage_list = src.storage_list;
src.storage_list = nullptr;
auto& src = mget_resp[sid];
ShardArgs shard_args = tx->GetShardArgs(sid);
unsigned src_indx = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
@ -1295,14 +1306,32 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
uint32_t indx = it.index();
res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
if (builder->type() == SinkReplyBuilder::MC) {
res.resp_arr[indx]->key = *it;
res[indx] = std::move(src.resp_arr[src_indx]);
if (builder->GetProtocol() == Protocol::MEMCACHE) {
res[indx]->key = *it;
}
}
}
return builder->SendMGetResponse(std::move(res));
SinkReplyBuilder::ReplyScope scope(builder);
if (builder->GetProtocol() == Protocol::MEMCACHE) {
auto* rb = static_cast<MCReplyBuilder*>(builder);
for (const auto& entry : res) {
if (!entry)
continue;
rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag);
}
rb->SendSimpleString("END");
} else {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(res.size());
for (const auto& entry : res) {
if (entry)
rb->SendBulkString(entry->value);
else
rb->SendNull();
}
}
}
void StringFamily::MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,