diff --git a/src/facade/conn_context.cc b/src/facade/conn_context.cc index 7528c5e4b..4753def33 100644 --- a/src/facade/conn_context.cc +++ b/src/facade/conn_context.cc @@ -6,13 +6,10 @@ #include "absl/flags/internal/flag.h" #include "base/flags.h" +#include "base/logging.h" #include "facade/dragonfly_connection.h" #include "facade/reply_builder.h" -ABSL_FLAG(bool, experimental_new_io, true, - "Use new replying code - should " - "reduce latencies for pipelining"); - namespace facade { ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) { @@ -22,11 +19,11 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow if (stream) { switch (protocol_) { + case Protocol::NONE: + LOG(DFATAL) << "Invalid protocol"; + break; case Protocol::REDIS: { - RedisReplyBuilder* rb = absl::GetFlag(FLAGS_experimental_new_io) - ? new RedisReplyBuilder2(stream) - : new RedisReplyBuilder(stream); - rbuilder_.reset(rb); + rbuilder_.reset(new RedisReplyBuilder(stream)); break; } case Protocol::MEMCACHE: diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 995e4b6e0..e1508700e 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -479,8 +479,8 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) { } arr[i++] = pub_msg.channel; arr[i++] = pub_msg.message; - rbuilder->SendStringArr(absl::Span{arr.data(), i}, - RedisReplyBuilder::CollectionType::PUSH); + rbuilder->SendBulkStrArr(absl::Span{arr.data(), i}, + RedisReplyBuilder::CollectionType::PUSH); } void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) { @@ -518,7 +518,7 @@ void Connection::DispatchOperations::operator()(const InvalidationMessage& msg) rbuilder->SendNull(); } else { std::string_view keys[] = {msg.key}; - rbuilder->SendStringArr(keys); + rbuilder->SendBulkStrArr(keys); } } @@ -550,6 +550,9 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, static_assert(kReqSz <= 256 && kReqSz >= 200); switch (protocol) { + case Protocol::NONE: + LOG(DFATAL) << "Invalid protocol"; + break; case Protocol::REDIS: redis_parser_.reset(new RedisParser(GetFlag(FLAGS_max_multi_bulk_len))); break; @@ -1358,7 +1361,7 @@ bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) { void Connection::SquashPipeline() { DCHECK_EQ(dispatch_q_.size(), pending_pipeline_cmd_cnt_); - DCHECK_EQ(reply_builder_->type(), SinkReplyBuilder::REDIS); // Only Redis is supported. + DCHECK_EQ(reply_builder_->GetProtocol(), Protocol::REDIS); // Only Redis is supported. vector squash_cmds; squash_cmds.reserve(dispatch_q_.size()); @@ -1377,7 +1380,7 @@ void Connection::SquashPipeline() { service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_, cc_.get()); if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared - reply_builder_->FlushBatch(); + reply_builder_->Flush(); reply_builder_->SetBatchMode(false); // in case the next dispatch is sync } @@ -1498,7 +1501,7 @@ void Connection::ExecutionFiber() { // last command to reply and flush. If it doesn't reply (i.e. is a control message like // migrate), we have to flush manually. if (dispatch_q_.empty() && !msg.IsReplying()) { - reply_builder_->FlushBatch(); + reply_builder_->Flush(); } if (ShouldEndDispatchFiber(msg)) { diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 52a6d7cea..cce96aa75 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -165,6 +165,10 @@ ostream& operator<<(ostream& os, facade::CmdArgList ras) { return os; } +ostream& operator<<(ostream& os, facade::Protocol p) { + return os << int(p); +} + ostream& operator<<(ostream& os, const facade::RespExpr& e) { using facade::RespExpr; using facade::ToSV; diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 1dbcc9087..006fdf52e 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -33,7 +33,7 @@ constexpr size_t kSanitizerOverhead = 0u; #endif #endif -enum class Protocol : uint8_t { MEMCACHE = 1, REDIS = 2 }; +enum class Protocol : uint8_t { NONE = 0, MEMCACHE = 1, REDIS = 2 }; using MutableSlice = std::string_view; using CmdArgList = absl::Span; @@ -189,5 +189,6 @@ void ResetStats(); namespace std { ostream& operator<<(ostream& os, facade::CmdArgList args); +ostream& operator<<(ostream& os, facade::Protocol protocol); } // namespace std diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 478e3d855..3ca92b1c9 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -49,10 +49,6 @@ constexpr unsigned kConvFlags = DoubleToStringConverter dfly_conv(kConvFlags, "inf", "nan", 'e', -6, 21, 6, 0); -const char* NullString(bool resp3) { - return resp3 ? "_\r\n" : "$-1\r\n"; -} - template size_t piece_size(const T& v) { if constexpr (is_array_v) return ABSL_ARRAYSIZE(v) - 1; // expect null terminated @@ -77,21 +73,28 @@ char* write_piece(string_view str, char* dest) { } // namespace -SinkReplyBuilder::MGetResponse::~MGetResponse() { - while (storage_list) { - auto* next = storage_list->next; - delete[] reinterpret_cast(storage_list); - storage_list = next; - } +SinkReplyBuilder::ReplyAggregator::~ReplyAggregator() { + rb->batched_ = prev; + if (!prev) + rb->Flush(); } -SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink, Type t) - : sink_(sink), - should_batch_(false), - should_aggregate_(false), - has_replied_(true), - send_active_(false), - type_(t) { +SinkReplyBuilder::ReplyScope::~ReplyScope() { + rb->scoped_ = prev; + if (!prev) + rb->FinishScope(); +} + +void SinkReplyBuilder::SendError(ErrorReply error) { + if (error.status) + return SendError(*error.status); + SendError(error.ToSv(), error.kind); +} + +void SinkReplyBuilder::SendError(OpStatus status) { + if (status == OpStatus::OK) + return SendSimpleString("OK"); + SendError(StatusToMsg(status)); } void SinkReplyBuilder::CloseConnection() { @@ -99,157 +102,7 @@ void SinkReplyBuilder::CloseConnection() { ec_ = std::make_error_code(std::errc::connection_aborted); } -void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { - has_replied_ = true; - DCHECK(sink_); - constexpr size_t kMaxBatchSize = 1024; - - size_t bsize = 0; - for (unsigned i = 0; i < len; ++i) { - bsize += v[i].iov_len; - } - - // Allow batching with up to kMaxBatchSize of data. - if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) { - batch_.reserve(batch_.size() + bsize); - for (unsigned i = 0; i < len; ++i) { - std::string_view src((char*)v[i].iov_base, v[i].iov_len); - DVLOG(3) << "Appending to stream " << absl::CHexEscape(src); - batch_.append(src.data(), src.size()); - } - DVLOG(2) << "Batched " << bsize << " bytes"; - return; - } - - int64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs(); - error_code ec; - send_active_ = true; - tl_facade_stats->reply_stats.io_write_cnt++; - tl_facade_stats->reply_stats.io_write_bytes += bsize; - DVLOG(2) << "Writing " << bsize + batch_.size() << " bytes of len " << len; - - if (batch_.empty()) { - ec = sink_->Write(v, len); - } else { - DVLOG(3) << "Sending batch to stream :" << absl::CHexEscape(batch_); - - tl_facade_stats->reply_stats.io_write_bytes += batch_.size(); - if (len == UIO_MAXIOV) { - ec = sink_->Write(io::Buffer(batch_)); - if (!ec) { - ec = sink_->Write(v, len); - } - } else { - iovec tmp[len + 1]; - tmp[0].iov_base = batch_.data(); - tmp[0].iov_len = batch_.size(); - copy(v, v + len, tmp + 1); - ec = sink_->Write(tmp, len + 1); - } - batch_.clear(); - } - send_active_ = false; - int64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs(); - tl_facade_stats->reply_stats.send_stats.count++; - tl_facade_stats->reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000; - - if (ec) { - DVLOG(1) << "Error writing to stream: " << ec.message(); - ec_ = ec; - } -} - -void SinkReplyBuilder::SendRaw(std::string_view raw) { - iovec v = {IoVec(raw)}; - - Send(&v, 1); -} - -void SinkReplyBuilder::ExpectReply() { - has_replied_ = false; -} - -void SinkReplyBuilder::SendError(ErrorReply error) { - if (error.status) - return SendError(*error.status); - - SendError(error.ToSv(), error.kind); -} - -void SinkReplyBuilder::SendError(OpStatus status) { - if (status == OpStatus::OK) { - SendOk(); - } else { - SendError(StatusToMsg(status)); - } -} - -void SinkReplyBuilder::StartAggregate() { - DVLOG(1) << "StartAggregate"; - should_aggregate_ = true; -} - -void SinkReplyBuilder::StopAggregate() { - DVLOG(1) << "StopAggregate"; - should_aggregate_ = false; - - if (should_batch_) - return; - - FlushBatch(); -} - -void SinkReplyBuilder::SetBatchMode(bool batch) { - DVLOG(1) << "SetBatchMode(" << (batch ? "true" : "false") << ")"; - should_batch_ = batch; -} - -void SinkReplyBuilder::FlushBatch() { - if (batch_.empty()) - return; - - error_code ec = sink_->Write(io::Buffer(batch_)); - batch_.clear(); - if (ec) { - DVLOG(1) << "Error flushing to stream: " << ec.message(); - ec_ = ec; - } -} - -size_t SinkReplyBuilder::UsedMemory() const { - return dfly::HeapSize(batch_); -} - -SinkReplyBuilder2::ReplyAggregator::~ReplyAggregator() { - rb->batched_ = prev; - if (!prev) - rb->Flush(); -} - -SinkReplyBuilder2::ReplyScope::~ReplyScope() { - rb->scoped_ = prev; - if (!prev) - rb->FinishScope(); -} - -void SinkReplyBuilder2::SendError(ErrorReply error) { - if (error.status) - return SendError(*error.status); - SendError(error.ToSv(), error.kind); -} - -void SinkReplyBuilder2::SendError(OpStatus status) { - if (status == OpStatus::OK) - return SendSimpleString("OK"); - SendError(StatusToMsg(status)); -} - -void SinkReplyBuilder2::CloseConnection() { - if (!ec_) - ec_ = std::make_error_code(std::errc::connection_aborted); -} - -template void SinkReplyBuilder2::WritePieces(Ts&&... pieces) { +template void SinkReplyBuilder::WritePieces(Ts&&... pieces) { if (size_t required = (piece_size(pieces) + ...); buffer_.AppendLen() <= required) Flush(required); @@ -268,12 +121,12 @@ template void SinkReplyBuilder2::WritePieces(Ts&&... pieces) { total_size_ += written; } -void SinkReplyBuilder2::WriteRef(std::string_view str) { +void SinkReplyBuilder::WriteRef(std::string_view str) { NextVec(str); total_size_ += str.size(); } -void SinkReplyBuilder2::Flush(size_t expected_buffer_cap) { +void SinkReplyBuilder::Flush(size_t expected_buffer_cap) { Send(); // Grow backing buffer if was at least half full and still below it's max size @@ -290,7 +143,7 @@ void SinkReplyBuilder2::Flush(size_t expected_buffer_cap) { buffer_.Reserve(expected_buffer_cap); } -void SinkReplyBuilder2::Send() { +void SinkReplyBuilder::Send() { auto& reply_stats = tl_facade_stats->reply_stats; send_active_ = true; @@ -308,7 +161,9 @@ void SinkReplyBuilder2::Send() { send_active_ = false; } -void SinkReplyBuilder2::FinishScope() { +void SinkReplyBuilder::FinishScope() { + replies_recorded_++; + if (!batched_ || total_size_ * 2 >= kMaxBufferSize) return Flush(); @@ -332,22 +187,37 @@ void SinkReplyBuilder2::FinishScope() { guaranteed_pieces_ = vecs_.size(); // all vecs are pieces } -void SinkReplyBuilder2::NextVec(std::string_view str) { +void SinkReplyBuilder::NextVec(std::string_view str) { if (vecs_.size() >= IOV_MAX - 2) Flush(); vecs_.push_back(iovec{const_cast(str.data()), str.size()}); } -MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink, MC), noreply_(false) { +MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) { +} + +void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, + uint32_t mc_flag) { + ReplyScope scope(this); + WritePieces("VALUE ", key, " ", mc_flag, " ", value.size()); + if (mc_ver) + WritePieces(" ", mc_ver); + + if (value.size() <= kMaxInlineSize) { + WritePieces(kCRLF, value, kCRLF); + } else { + WritePieces(kCRLF); + WriteRef(value); + WritePieces(kCRLF); + } } void MCReplyBuilder::SendSimpleString(std::string_view str) { if (noreply_) return; - iovec v[2] = {IoVec(str), IoVec(kCRLF)}; - - Send(v, ABSL_ARRAYSIZE(v)); + ReplyScope scope(this); + WritePieces(str, kCRLF); } void MCReplyBuilder::SendStored() { @@ -355,31 +225,11 @@ void MCReplyBuilder::SendStored() { } void MCReplyBuilder::SendLong(long val) { - char buf[32]; - char* next = absl::numbers_internal::FastIntToBuffer(val, buf); - SendSimpleString(string_view(buf, next - buf)); -} - -void MCReplyBuilder::SendMGetResponse(MGetResponse resp) { - string header; - for (unsigned i = 0; i < resp.resp_arr.size(); ++i) { - if (resp.resp_arr[i]) { - const auto& src = *resp.resp_arr[i]; - absl::StrAppend(&header, "VALUE ", src.key, " ", src.mc_flag, " ", src.value.size()); - if (src.mc_ver) { - absl::StrAppend(&header, " ", src.mc_ver); - } - - absl::StrAppend(&header, "\r\n"); - iovec v[] = {IoVec(header), IoVec(src.value), IoVec(kCRLF)}; - Send(v, ABSL_ARRAYSIZE(v)); - header.clear(); - } - } - SendSimpleString("END"); + SendSimpleString(absl::StrCat(val)); } void MCReplyBuilder::SendError(string_view str, std::string_view type) { + last_error_ = str; SendSimpleString(absl::StrCat("SERVER_ERROR ", str)); } @@ -387,13 +237,8 @@ void MCReplyBuilder::SendProtocolError(std::string_view str) { SendSimpleString(absl::StrCat("CLIENT_ERROR ", str)); } -bool MCReplyBuilder::NoReply() const { - return noreply_; -} - void MCReplyBuilder::SendClientError(string_view str) { - iovec v[] = {IoVec("CLIENT_ERROR "), IoVec(str), IoVec(kCRLF)}; - Send(v, ABSL_ARRAYSIZE(v)); + SendSimpleString(absl::StrCat("CLIENT_ERROR ", str)); } void MCReplyBuilder::SendSetSkipped() { @@ -404,455 +249,18 @@ void MCReplyBuilder::SendNotFound() { SendSimpleString("NOT_FOUND"); } -MCReplyBuilder2::MCReplyBuilder2(::io::Sink* sink) : SinkReplyBuilder2(sink), noreply_(false) { -} - -void MCReplyBuilder2::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, - uint32_t mc_flag) { +void MCReplyBuilder::SendRaw(std::string_view str) { ReplyScope scope(this); - WritePieces("VALUE ", key, " ", mc_flag, " ", value.size()); - if (mc_ver) - WritePieces(" ", mc_ver); - - if (value.size() <= kMaxInlineSize) { - WritePieces(value, kCRLF); - } else { - WriteRef(value); - WritePieces(kCRLF); - } + WriteRef(str); } -void MCReplyBuilder2::SendSimpleString(std::string_view str) { - if (noreply_) - return; - +void RedisReplyBuilderBase::SendNull() { ReplyScope scope(this); - WritePieces(str, kCRLF); -} - -void MCReplyBuilder2::SendStored() { - SendSimpleString("STORED"); -} - -void MCReplyBuilder2::SendLong(long val) { - SendSimpleString(absl::StrCat(val)); -} - -void MCReplyBuilder2::SendError(string_view str, std::string_view type) { - SendSimpleString(absl::StrCat("SERVER_ERROR ", str)); -} - -void MCReplyBuilder2::SendProtocolError(std::string_view str) { - SendSimpleString(absl::StrCat("CLIENT_ERROR ", str)); -} - -void MCReplyBuilder2::SendClientError(string_view str) { - SendSimpleString(absl::StrCat("CLIENT_ERROR", str)); -} - -void MCReplyBuilder2::SendSetSkipped() { - SendSimpleString("NOT_STORED"); -} - -void MCReplyBuilder2::SendNotFound() { - SendSimpleString("NOT_FOUND"); -} - -char* RedisReplyBuilder::FormatDouble(double val, char* dest, unsigned dest_len) { - StringBuilder sb(dest, dest_len); - CHECK(dfly_conv.ToShortest(val, &sb)); - return sb.Finalize(); -} - -RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink, REDIS) { -} - -void RedisReplyBuilder::SetResp3(bool is_resp3) { - is_resp3_ = is_resp3; -} - -void RedisReplyBuilder::SendError(string_view str, string_view err_type) { - VLOG(1) << "Error: " << str; - - if (err_type.empty()) { - err_type = str; - if (err_type == kSyntaxErr) - err_type = kSyntaxErrType; - else if (err_type == kWrongTypeErr) - err_type = kWrongTypeErrType; - else if (err_type == kScriptNotFound) - err_type = kScriptErrType; - } - - tl_facade_stats->reply_stats.err_count[err_type]++; - - if (str[0] == '-') { - iovec v[] = {IoVec(str), IoVec(kCRLF)}; - Send(v, ABSL_ARRAYSIZE(v)); - return; - } - - iovec v[] = {IoVec(kErrPref), IoVec(str), IoVec(kCRLF)}; - Send(v, ABSL_ARRAYSIZE(v)); -} - -void RedisReplyBuilder::SendProtocolError(std::string_view str) { - SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error"); -} - -void RedisReplyBuilder::SendSimpleString(std::string_view str) { - iovec v[3] = {IoVec(kSimplePref), IoVec(str), IoVec(kCRLF)}; - - Send(v, ABSL_ARRAYSIZE(v)); -} - -void RedisReplyBuilder::SendStored() { - SendSimpleString("OK"); -} - -void RedisReplyBuilder::SendSetSkipped() { - SendNull(); -} - -void RedisReplyBuilder::SendNull() { - iovec v[] = {IoVec(NullString(is_resp3_))}; - - Send(v, ABSL_ARRAYSIZE(v)); -} - -void RedisReplyBuilder::SendBulkString(std::string_view str) { - char tmp[absl::numbers_internal::kFastToBufferSize + 3]; - tmp[0] = '$'; // Format length - char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size()), tmp + 1); - *next++ = '\r'; - *next++ = '\n'; - - std::string_view lenpref{tmp, size_t(next - tmp)}; - - // 3 parts: length, string and CRLF. - iovec v[3] = {IoVec(lenpref), IoVec(str), IoVec(kCRLF)}; - - return Send(v, ABSL_ARRAYSIZE(v)); -} - -void RedisReplyBuilder::SendVerbatimString(std::string_view str, VerbatimFormat format) { - if (!is_resp3_) - return SendBulkString(str); - - char tmp[absl::numbers_internal::kFastToBufferSize + 7]; - tmp[0] = '='; - // + 4 because format is three byte, and need to be followed by a ":" - char* next = absl::numbers_internal::FastIntToBuffer(uint32_t(str.size() + 4), tmp + 1); - *next++ = '\r'; - *next++ = '\n'; - - DCHECK(format <= VerbatimFormat::MARKDOWN); - if (format == VerbatimFormat::TXT) - strcpy(next, "txt:"); - else if (format == VerbatimFormat::MARKDOWN) - strcpy(next, "mkd:"); - next += 4; - std::string_view lenpref{tmp, size_t(next - tmp)}; - iovec v[3] = {IoVec(lenpref), IoVec(str), IoVec(kCRLF)}; - return Send(v, ABSL_ARRAYSIZE(v)); -} - -void RedisReplyBuilder::SendLong(long num) { - string str = absl::StrCat(":", num, kCRLF); - SendRaw(str); -} - -void RedisReplyBuilder::SendScoredArray(absl::Span> arr, - bool with_scores) { - ReplyAggregator agg(this); - if (!with_scores) { - auto cb = [&](size_t indx) -> string_view { return arr[indx].first; }; - - SendStringArrInternal(arr.size(), std::move(cb), CollectionType::ARRAY); - return; - } - - char buf[DoubleToStringConverter::kBase10MaximalLength * 3]; // to be on the safe side. - - if (!is_resp3_) { // RESP2 formats withscores as a flat array. - auto cb = [&](size_t indx) -> string_view { - if (indx % 2 == 0) - return arr[indx / 2].first; - - // NOTE: we reuse the same buffer, assuming that SendStringArrInternal does not reference - // previous string_views. The assumption holds for small strings like - // doubles because SendStringArrInternal employs small string optimization. - // It's a bit hacky but saves allocations. - return FormatDouble(arr[indx / 2].second, buf, sizeof(buf)); - }; - - SendStringArrInternal(arr.size() * 2, std::move(cb), CollectionType::ARRAY); - return; - } - - // Resp3 formats withscores as array of (key, score) pairs. - // TODO: to implement efficient serializing by extending SendStringArrInternal to support - // 2-level arrays. - StartArray(arr.size()); - for (const auto& p : arr) { - StartArray(2); - SendBulkString(p.first); - SendDouble(p.second); - } -} - -void RedisReplyBuilder::SendDouble(double val) { - char buf[64]; - - char* start = FormatDouble(val, buf, sizeof(buf)); - - if (!is_resp3_) { - SendBulkString(start); - } else { - // RESP3 - SendRaw(absl::StrCat(",", start, kCRLF)); - } -} - -void RedisReplyBuilder::SendMGetResponse(MGetResponse resp) { - DCHECK(!resp.resp_arr.empty()); - - size_t size = resp.resp_arr.size(); - - size_t vec_len = std::min(32, size); - - constexpr size_t kBatchLen = 32 * 2 + 2; // (blob_size, blob) * 32 + 2 spares - iovec vec_batch[kBatchLen]; - - // for all the meta data to fill the vec batch. 10 digits for the blob size and 6 for - // $, \r, \n, \r, \n - absl::FixedArray meta((vec_len + 2) * 16); // 2 for header and next item meta data. - - char* next = meta.data(); - char* cur_meta = next; - *next++ = '*'; - next = absl::numbers_internal::FastIntToBuffer(size, next); - *next++ = '\r'; - *next++ = '\n'; - - unsigned vec_indx = 0; - const char* nullstr = NullString(is_resp3_); - size_t nulllen = strlen(nullstr); - auto get_pending_metabuf = [&] { return string_view{cur_meta, size_t(next - cur_meta)}; }; - - for (unsigned i = 0; i < size; ++i) { - DCHECK_GE(meta.end() - next, 16); // We have at least 16 bytes for the meta data. - if (resp.resp_arr[i]) { - string_view blob = resp.resp_arr[i]->value; - - *next++ = '$'; - next = absl::numbers_internal::FastIntToBuffer(blob.size(), next); - *next++ = '\r'; - *next++ = '\n'; - DCHECK_GT(next - cur_meta, 0); - - vec_batch[vec_indx++] = IoVec(get_pending_metabuf()); - vec_batch[vec_indx++] = IoVec(blob); - cur_meta = next; // we combine the CRLF with the next item meta data. - *next++ = '\r'; - *next++ = '\n'; - } else { - memcpy(next, nullstr, nulllen); - next += nulllen; - } - - if (vec_indx >= (kBatchLen - 2) || (meta.end() - next < 16)) { - // we have space for at least one iovec because in the worst case we reached (kBatchLen - 3) - // and then filled 2 vectors in the previous iteration. - DCHECK_LE(vec_indx, kBatchLen - 1); - - // if we do not have enough space in the meta buffer, we add the meta data to the - // vector batch and reset it. - if (meta.end() - next < 16) { - vec_batch[vec_indx++] = IoVec(get_pending_metabuf()); - next = meta.data(); - cur_meta = next; - } - - Send(vec_batch, vec_indx); - if (ec_) - return; - - vec_indx = 0; - size_t meta_len = next - cur_meta; - memcpy(meta.data(), cur_meta, meta_len); - cur_meta = meta.data(); - next = cur_meta + meta_len; - } - } - - if (next - cur_meta > 0) { - vec_batch[vec_indx++] = IoVec(get_pending_metabuf()); - } - if (vec_indx > 0) - Send(vec_batch, vec_indx); -} - -void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) { - string res = absl::StrCat("*", arr.Size(), kCRLF); - for (string_view str : arr) - StrAppend(&res, "+", str, kCRLF); - - SendRaw(res); -} - -void RedisReplyBuilder::SendNullArray() { - SendRaw("*-1\r\n"); -} - -void RedisReplyBuilder::SendEmptyArray() { - StartArray(0); -} - -void RedisReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) { - if (type == ARRAY && arr.Size() == 0) { - SendRaw("*0\r\n"); - return; - } - - auto cb = [&](size_t i) { - return visit([i](auto& span) { return facade::ToSV(span[i]); }, arr.span); - }; - SendStringArrInternal(arr.Size(), std::move(cb), type); -} - -void RedisReplyBuilder::StartArray(unsigned len) { - StartCollection(len, ARRAY); -} - -constexpr static string_view START_SYMBOLS[] = {"*", "~", "%", ">"}; -static_assert(START_SYMBOLS[RedisReplyBuilder::MAP] == "%" && - START_SYMBOLS[RedisReplyBuilder::SET] == "~"); - -void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) { - if (!is_resp3_) { // Flatten for Resp2 - if (type == MAP) - len *= 2; - type = ARRAY; - } - - DVLOG(2) << "StartCollection(" << len << ", " << type << ")"; - - // We do not want to send multiple packets for small responses because these - // trigger TCP-related artifacts (e.g. Nagle's algorithm) that slow down the delivery of the whole - // response. - bool prev = should_aggregate_; - should_aggregate_ |= (len > 0); - SendRaw(absl::StrCat(START_SYMBOLS[type], len, kCRLF)); - should_aggregate_ = prev; -} - -// This implementation a bit complicated because it uses vectorized -// send to send an array. The problem with that is the OS limits vector length to UIO_MAXIOV. -// Therefore, to make it robust we send the array in batches. -// We limit the vector length, and when it fills up we flush it to the socket and continue -// iterating. -void RedisReplyBuilder::SendStringArrInternal( - size_t size, absl::FunctionRef producer, CollectionType type) { - size_t header_len = size; - string_view type_char = "*"; - if (is_resp3_) { - type_char = START_SYMBOLS[type]; - if (type == MAP) - header_len /= 2; // Each key value pair counts as one. - } - - if (header_len == 0) { - SendRaw(absl::StrCat(type_char, "0\r\n")); - return; - } - - // We limit iovec capacity, vectorized length is limited upto UIO_MAXIOV (Send returns EMSGSIZE). - size_t vec_cap = std::min(UIO_MAXIOV, size * 2); - absl::FixedArray vec(vec_cap); - absl::FixedArray meta(std::max(vec_cap * 64, 128u)); - - char* start = meta.data(); - char* next = start; - - // at most 35 chars. - auto serialize_len = [&](char prefix, size_t len) { - *next++ = prefix; - next = absl::numbers_internal::FastIntToBuffer(len, next); // at most 32 chars - *next++ = '\r'; - *next++ = '\n'; - }; - - serialize_len(type_char[0], header_len); - unsigned vec_indx = 0; - string_view src; - -#define FLUSH_IOVEC() \ - do { \ - Send(vec.data(), vec_indx); \ - if (ec_) \ - return; \ - vec_indx = 0; \ - next = meta.data(); \ - } while (false) - - for (unsigned i = 0; i < size; ++i) { - DCHECK_LT(vec_indx, vec_cap); - - src = producer(i); - serialize_len('$', src.size()); - - // copy data either by referencing via an iovec or copying inline into meta buf. - constexpr size_t kSSOLen = 32; - if (src.size() > kSSOLen) { - // reference metadata blob before referencing another vector. - DCHECK_GT(next - start, 0); - vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)}); - if (vec_indx >= vec_cap) { - FLUSH_IOVEC(); - } - - DCHECK_LT(vec_indx, vec.size()); - vec[vec_indx++] = IoVec(src); - if (vec_indx >= vec_cap) { - FLUSH_IOVEC(); - } - start = next; - } else if (src.size() > 0) { - // NOTE!: this is not just optimization. producer may returns a string_piece that will - // be overriden for the next call, so we must do this for correctness. - memcpy(next, src.data(), src.size()); - next += src.size(); - } - - // how much buffer we need to perform the next iteration. - constexpr ptrdiff_t kMargin = kSSOLen + 3 /* $\r\n */ + 2 /*length*/ + 2 /* \r\n */; - - // Keep at least kMargin bytes for a small string as well as its length. - if (kMargin >= meta.end() - next) { - // Flush the iovec array. - vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)}); - FLUSH_IOVEC(); - start = next; - } - *next++ = '\r'; - *next++ = '\n'; - } - - vec[vec_indx].iov_base = start; - vec[vec_indx].iov_len = next - start; - Send(vec.data(), vec_indx + 1); -} - -void RedisReplyBuilder2Base::SendNull() { - ReplyScope scope(this); - has_replied_ = true; resp3_ ? WritePieces(kNullStringR3) : WritePieces(kNullStringR2); } -void RedisReplyBuilder2Base::SendSimpleString(std::string_view str) { +void RedisReplyBuilderBase::SendSimpleString(std::string_view str) { ReplyScope scope(this); - has_replied_ = true; if (str.size() <= kMaxInlineSize * 2) return WritePieces(kSimplePref, str, kCRLF); @@ -861,9 +269,8 @@ void RedisReplyBuilder2Base::SendSimpleString(std::string_view str) { WritePieces(kCRLF); } -void RedisReplyBuilder2Base::SendBulkString(std::string_view str) { +void RedisReplyBuilderBase::SendBulkString(std::string_view str) { ReplyScope scope(this); - has_replied_ = true; if (str.size() <= kMaxInlineSize) return WritePieces(kLengthPrefix, uint32_t(str.size()), kCRLF, str, kCRLF); @@ -872,14 +279,12 @@ void RedisReplyBuilder2Base::SendBulkString(std::string_view str) { WritePieces(kCRLF); } -void RedisReplyBuilder2Base::SendLong(long val) { +void RedisReplyBuilderBase::SendLong(long val) { ReplyScope scope(this); - has_replied_ = true; WritePieces(kLongPref, val, kCRLF); } -void RedisReplyBuilder2Base::SendDouble(double val) { - has_replied_ = true; +void RedisReplyBuilderBase::SendDouble(double val) { char buf[DoubleToStringConverter::kBase10MaximalLength + 8]; // +8 to be on the safe side. static_assert(ABSL_ARRAYSIZE(buf) < kMaxInlineSize, "Write temporary string from buf inline"); string_view val_str = FormatDouble(val, buf, ABSL_ARRAYSIZE(buf)); @@ -891,18 +296,16 @@ void RedisReplyBuilder2Base::SendDouble(double val) { WritePieces(kDoublePref, val_str, kCRLF); } -void RedisReplyBuilder2Base::SendNullArray() { +void RedisReplyBuilderBase::SendNullArray() { ReplyScope scope(this); - has_replied_ = true; WritePieces("*-1", kCRLF); } constexpr static const char START_SYMBOLS2[4][2] = {"*", "~", "%", ">"}; -static_assert(START_SYMBOLS2[RedisReplyBuilder2Base::MAP][0] == '%' && - START_SYMBOLS2[RedisReplyBuilder2Base::SET][0] == '~'); +static_assert(START_SYMBOLS2[RedisReplyBuilderBase::MAP][0] == '%' && + START_SYMBOLS2[RedisReplyBuilderBase::SET][0] == '~'); -void RedisReplyBuilder2Base::StartCollection(unsigned len, CollectionType ct) { - has_replied_ = true; +void RedisReplyBuilderBase::StartCollection(unsigned len, CollectionType ct) { if (!IsResp3()) { // RESP2 supports only arrays if (ct == MAP) len *= 2; @@ -912,9 +315,8 @@ void RedisReplyBuilder2Base::StartCollection(unsigned len, CollectionType ct) { WritePieces(START_SYMBOLS2[ct], len, kCRLF); } -void RedisReplyBuilder2Base::SendError(std::string_view str, std::string_view type) { +void RedisReplyBuilderBase::SendError(std::string_view str, std::string_view type) { ReplyScope scope(this); - has_replied_ = true; if (type.empty()) { type = str; @@ -930,18 +332,17 @@ void RedisReplyBuilder2Base::SendError(std::string_view str, std::string_view ty WritePieces(str, kCRLF); } -void RedisReplyBuilder2Base::SendProtocolError(std::string_view str) { +void RedisReplyBuilderBase::SendProtocolError(std::string_view str) { SendError(absl::StrCat("-ERR Protocol error: ", str), "protocol_error"); } -char* RedisReplyBuilder2Base::FormatDouble(double d, char* dest, unsigned len) { +char* RedisReplyBuilderBase::FormatDouble(double d, char* dest, unsigned len) { StringBuilder sb(dest, len); CHECK(dfly_conv.ToShortest(d, &sb)); return sb.Finalize(); } -void RedisReplyBuilder2Base::SendVerbatimString(std::string_view str, VerbatimFormat format) { - has_replied_ = true; +void RedisReplyBuilderBase::SendVerbatimString(std::string_view str, VerbatimFormat format) { DCHECK(format <= VerbatimFormat::MARKDOWN); if (!IsResp3()) return SendBulkString(str); @@ -955,26 +356,26 @@ void RedisReplyBuilder2Base::SendVerbatimString(std::string_view str, VerbatimFo WritePieces(kCRLF); } -std::string RedisReplyBuilder2Base::SerializeCommand(std::string_view command) { +std::string RedisReplyBuilderBase::SerializeCommand(std::string_view command) { return string{command} + kCRLF; } -void RedisReplyBuilder2::SendSimpleStrArr2(const facade::ArgRange& strs) { +void RedisReplyBuilder::SendSimpleStrArr(const facade::ArgRange& strs) { ReplyScope scope(this); StartArray(strs.Size()); for (std::string_view str : strs) SendSimpleString(str); } -void RedisReplyBuilder2::SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct) { +void RedisReplyBuilder::SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct) { ReplyScope scope(this); StartCollection(ct == CollectionType::MAP ? strs.Size() / 2 : strs.Size(), ct); for (std::string_view str : strs) SendBulkString(str); } -void RedisReplyBuilder2::SendScoredArray(absl::Span> arr, - bool with_scores) { +void RedisReplyBuilder::SendScoredArray(absl::Span> arr, + bool with_scores) { ReplyScope scope(this); StartArray((with_scores && !IsResp3()) ? arr.size() * 2 : arr.size()); for (const auto& [str, score] : arr) { @@ -986,31 +387,20 @@ void RedisReplyBuilder2::SendScoredArray(absl::Spanvalue); - else - SendNull(); - } -} - } // namespace facade diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 9cd120702..e966d7b9f 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -21,188 +21,28 @@ enum class ReplyMode { FULL // All replies are recorded }; -class SinkReplyBuilder { - public: - struct MGetStorage { - MGetStorage* next = nullptr; - char data[1]; - }; - - struct GetResp { - std::string key; // TODO: to use backing storage to optimize this as well. - std::string_view value; - - uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested). - uint32_t mc_flag = 0; - - GetResp() = default; - GetResp(std::string_view val) : value(val) { - } - }; - - struct MGetResponse { - MGetStorage* storage_list = nullptr; // backing storage of resp_arr values. - std::vector> resp_arr; - - MGetResponse() = default; - - MGetResponse(size_t size) : resp_arr(size) { - } - - ~MGetResponse(); - - MGetResponse(MGetResponse&& other) noexcept - : storage_list(other.storage_list), resp_arr(std::move(other.resp_arr)) { - other.storage_list = nullptr; - } - - MGetResponse& operator=(MGetResponse&& other) noexcept { - resp_arr = std::move(other.resp_arr); - storage_list = other.storage_list; - other.storage_list = nullptr; - return *this; - } - }; - - SinkReplyBuilder(const SinkReplyBuilder&) = delete; - void operator=(const SinkReplyBuilder&) = delete; - - enum Type { REDIS, MC }; - - explicit SinkReplyBuilder(::io::Sink* sink, Type t); - - virtual ~SinkReplyBuilder() { - } - - static MGetStorage* AllocMGetStorage(size_t size) { - static_assert(alignof(MGetStorage) == 8); // if this breaks we should fix the code below. - char* buf = new char[size + sizeof(MGetStorage)]; - return new (buf) MGetStorage(); - } - - virtual void SendError(std::string_view str, std::string_view type = {}) = 0; // MC and Redis - virtual void SendError(OpStatus status); - void SendError(ErrorReply error); - - virtual void SendStored() = 0; // Reply for set commands. - virtual void SendSetSkipped() = 0; - - virtual void SendMGetResponse(MGetResponse resp) = 0; - - virtual void SendLong(long val) = 0; - virtual void SendSimpleString(std::string_view str) = 0; - - void SendOk() { - SendSimpleString("OK"); - } - - virtual void SendProtocolError(std::string_view str) = 0; - - // In order to reduce interrupt rate we allow coalescing responses together using - // Batch mode. It is controlled by Connection state machine because it makes sense only - // when pipelined requests are arriving. - virtual void SetBatchMode(bool batch); - - virtual void FlushBatch(); - - // Used for QUIT - > should move to conn_context? - virtual void CloseConnection(); - - virtual std::error_code GetError() const { - return ec_; - } - - bool IsSendActive() const { - return send_active_; // BROKEN - } - - struct ReplyAggregator { - explicit ReplyAggregator(SinkReplyBuilder* builder) : builder_(builder) { - // If the builder is already aggregating then don't aggregate again as - // this will cause redundant sink writes (such as in a MULTI/EXEC). - if (builder->should_aggregate_) { - return; - } - builder_->StartAggregate(); - is_nested_ = false; - } - - ~ReplyAggregator() { - if (!is_nested_) { - builder_->StopAggregate(); - } - } - - private: - SinkReplyBuilder* builder_; - bool is_nested_ = true; - }; - - void ExpectReply(); - bool HasReplied() const { - return has_replied_; - } - - virtual size_t UsedMemory() const; - - static const ReplyStats& GetThreadLocalStats() { - return tl_facade_stats->reply_stats; - } - - virtual void StartAggregate(); - virtual void StopAggregate(); - - std::string ConsumeLastError() { - return std::exchange(last_error_, std::string{}); - } - - Type type() const { - return type_; - } - - protected: - void SendRaw(std::string_view str); // Sends raw without any formatting. - - void Send(const iovec* v, uint32_t len); - - std::string batch_; - ::io::Sink* sink_; - std::error_code ec_; - - // msg and kind/type - std::string last_error_; - - bool should_batch_ : 1; - - // Similarly to batch mode but is controlled by at operation level. - bool should_aggregate_ : 1; - bool has_replied_ : 1; - bool send_active_ : 1; - Type type_; -}; - // Base class for all reply builders. Offer a simple high level interface for controlling output // modes and sending basic response types. -class SinkReplyBuilder2 { +class SinkReplyBuilder { struct GuardBase { bool prev; - SinkReplyBuilder2* rb; + SinkReplyBuilder* rb; }; public: constexpr static size_t kMaxInlineSize = 32; constexpr static size_t kMaxBufferSize = 8192; - explicit SinkReplyBuilder2(io::Sink* sink) : sink_(sink) { + explicit SinkReplyBuilder(io::Sink* sink) : sink_(sink) { } - virtual ~SinkReplyBuilder2() = default; + virtual ~SinkReplyBuilder() = default; // USE WITH CARE! ReplyScope assumes that all string views in Send calls keep valid for the scopes // lifetime. This allows the builder to avoid copies by enqueueing long strings directly for // vectorized io. struct ReplyScope : GuardBase { - explicit ReplyScope(SinkReplyBuilder2* rb) : GuardBase{std::exchange(rb->scoped_, true), rb} { + explicit ReplyScope(SinkReplyBuilder* rb) : GuardBase{std::exchange(rb->scoped_, true), rb} { } ~ReplyScope(); @@ -211,7 +51,7 @@ class SinkReplyBuilder2 { // Aggregator reduces the number of raw send calls by copying data in an intermediate buffer. // Prefer ReplyScope if possible to additionally reduce the number of copies. struct ReplyAggregator : GuardBase { - explicit ReplyAggregator(SinkReplyBuilder2* rb) + explicit ReplyAggregator(SinkReplyBuilder* rb) : GuardBase{std::exchange(rb->batched_, true), rb} { } @@ -228,7 +68,11 @@ class SinkReplyBuilder2 { return buffer_.Capacity(); } - bool IsSendActive() { + size_t RepliesRecorded() const { + return replies_recorded_; + } + + bool IsSendActive() const { return send_active_; } @@ -243,6 +87,10 @@ class SinkReplyBuilder2 { } public: // High level interface + virtual Protocol GetProtocol() const { + return Protocol::NONE; + } + virtual void SendLong(long val) = 0; virtual void SendSimpleString(std::string_view str) = 0; @@ -257,6 +105,10 @@ class SinkReplyBuilder2 { void SendError(ErrorReply error); virtual void SendProtocolError(std::string_view str) = 0; + std::string ConsumeLastError() { + return std::exchange(last_error_, {}); + } + protected: template void WritePieces(Ts&&... pieces); // Copy pieces into buffer and reference buffer @@ -267,11 +119,15 @@ class SinkReplyBuilder2 { void Send(); + protected: + size_t replies_recorded_ = 0; + std::string last_error_; + private: io::Sink* sink_; std::error_code ec_; - bool send_active_ = false; + bool send_active_ = false; // set while Send() is suspended on socket write bool scoped_ = false, batched_ = false; size_t total_size_ = 0; // sum of vec_ lengths @@ -285,40 +141,15 @@ class SinkReplyBuilder2 { }; class MCReplyBuilder : public SinkReplyBuilder { - bool noreply_; - public: - MCReplyBuilder(::io::Sink* stream); + explicit MCReplyBuilder(::io::Sink* sink); - using SinkReplyBuilder::SendRaw; + ~MCReplyBuilder() override = default; - void SendError(std::string_view str, std::string_view type = std::string_view{}) final; - - // void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) final; - void SendMGetResponse(MGetResponse resp) final; - - void SendStored() final; - void SendLong(long val) final; - void SendSetSkipped() final; - - void SendClientError(std::string_view str); - void SendNotFound(); - void SendSimpleString(std::string_view str) final; - void SendProtocolError(std::string_view str) final; - - void SetNoreply(bool noreply) { - noreply_ = noreply; + Protocol GetProtocol() const final { + return Protocol::MEMCACHE; } - bool NoReply() const; -}; - -class MCReplyBuilder2 : public SinkReplyBuilder2 { - public: - explicit MCReplyBuilder2(::io::Sink* sink); - - ~MCReplyBuilder2() override = default; - void SendError(std::string_view str, std::string_view type = std::string_view{}) final; void SendStored() final; @@ -332,6 +163,8 @@ class MCReplyBuilder2 : public SinkReplyBuilder2 { void SendSimpleString(std::string_view str) final; void SendProtocolError(std::string_view str) final; + void SendRaw(std::string_view str); + void SetNoreply(bool noreply) { noreply_ = noreply; } @@ -344,161 +177,72 @@ class MCReplyBuilder2 : public SinkReplyBuilder2 { bool noreply_ = false; }; -class RedisReplyBuilder : public SinkReplyBuilder { +// Redis reply builder interface for sending RESP data. +class RedisReplyBuilderBase : public SinkReplyBuilder { public: enum CollectionType { ARRAY, SET, MAP, PUSH }; - enum VerbatimFormat { TXT, MARKDOWN }; - using StrSpan = facade::ArgRange; - - RedisReplyBuilder(::io::Sink* stream); - - virtual void SetResp3(bool is_resp3); - virtual bool IsResp3() const { - return is_resp3_; + explicit RedisReplyBuilderBase(io::Sink* sink) : SinkReplyBuilder(sink) { } - void SendError(std::string_view str, std::string_view type = {}) override; - using SinkReplyBuilder::SendError; + ~RedisReplyBuilderBase() override = default; - void SendMGetResponse(MGetResponse resp) override; - - void SendStored() override; - void SendSetSkipped() override; - void SendProtocolError(std::string_view str) override; - - virtual void SendNullArray(); // Send *-1 - virtual void SendEmptyArray(); // Send *0 - virtual void SendSimpleStrArr(StrSpan arr); - virtual void SendStringArr(StrSpan arr, CollectionType type = ARRAY); + Protocol GetProtocol() const final { + return Protocol::REDIS; + } virtual void SendNull(); - void SendLong(long val) override; - virtual void SendDouble(double val); - void SendSimpleString(std::string_view str) override; - - virtual void SendBulkString(std::string_view str); - virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT); - virtual void SendScoredArray(absl::Span> arr, - bool with_scores); - - void StartArray(unsigned len); // StartCollection(len, ARRAY) - - virtual void StartCollection(unsigned len, CollectionType type); - - static char* FormatDouble(double val, char* dest, unsigned dest_len); - - private: - void SendStringArrInternal(size_t size, absl::FunctionRef producer, - CollectionType type); - - bool is_resp3_ = false; -}; - -// Redis reply builder interface for sending RESP data. -class RedisReplyBuilder2Base : public SinkReplyBuilder2, public RedisReplyBuilder { - public: - using CollectionType = RedisReplyBuilder::CollectionType; - using VerbatimFormat = RedisReplyBuilder::VerbatimFormat; - - explicit RedisReplyBuilder2Base(io::Sink* sink) - : SinkReplyBuilder2(sink), RedisReplyBuilder(nullptr) { - } - - ~RedisReplyBuilder2Base() override = default; - - void SendNull() override; void SendSimpleString(std::string_view str) override; - void SendBulkString(std::string_view str) override; // RESP: Blob String + virtual void SendBulkString(std::string_view str); // RESP: Blob String void SendLong(long val) override; - void SendDouble(double val) override; // RESP: Number + virtual void SendDouble(double val); // RESP: Number - void SendNullArray() override; - void StartCollection(unsigned len, CollectionType ct) override; + virtual void SendNullArray(); + virtual void StartCollection(unsigned len, CollectionType ct); - using SinkReplyBuilder2::SendError; + using SinkReplyBuilder::SendError; void SendError(std::string_view str, std::string_view type = {}) override; void SendProtocolError(std::string_view str) override; - virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT) override; + virtual void SendVerbatimString(std::string_view str, VerbatimFormat format = TXT); static char* FormatDouble(double d, char* dest, unsigned len); static std::string SerializeCommand(std::string_view command); - bool IsResp3() const override { + bool IsResp3() const { return resp3_; } - // REMOVE THIS override - void SetResp3(bool resp3) override { + void SetResp3(bool resp3) { resp3_ = resp3; } - // REMOVE THIS - void SetBatchMode(bool mode) override { - SinkReplyBuilder2::SetBatchMode(mode); - } - - void StartAggregate() override { - aggregators_.emplace_back(this); - } - - void StopAggregate() override { - aggregators_.pop_back(); - } - - void FlushBatch() override { - SinkReplyBuilder2::Flush(); - } - - // REMOVE THIS - - void CloseConnection() override { - SinkReplyBuilder2::CloseConnection(); - } - - std::error_code GetError() const override { - return SinkReplyBuilder2::GetError(); - } - private: - std::vector aggregators_; bool resp3_ = false; }; -// Non essential rediss reply builder functions implemented on top of the base resp protocol -class RedisReplyBuilder2 : public RedisReplyBuilder2Base { +// Non essential redis reply builder functions implemented on top of the base resp protocol +class RedisReplyBuilder : public RedisReplyBuilderBase { public: - RedisReplyBuilder2(io::Sink* sink) : RedisReplyBuilder2Base(sink) { + using RedisReplyBuilderBase::CollectionType; + + RedisReplyBuilder(io::Sink* sink) : RedisReplyBuilderBase(sink) { } - ~RedisReplyBuilder2() override = default; - - void SendSimpleStrArr2(const facade::ArgRange& strs); + ~RedisReplyBuilder() override = default; + void SendSimpleStrArr(const facade::ArgRange& strs); void SendBulkStrArr(const facade::ArgRange& strs, CollectionType ct = ARRAY); - void SendScoredArray(absl::Span> arr, - bool with_scores) override; - - void SendSimpleStrArr(RedisReplyBuilder::StrSpan arr) override { - SendSimpleStrArr2(arr); - } - - void SendStringArr(RedisReplyBuilder::StrSpan arr, CollectionType type = ARRAY) override { - SendBulkStrArr(arr, type); - } + void SendScoredArray(absl::Span> arr, bool with_scores); void SendStored() final; void SendSetSkipped() final; void StartArray(unsigned len); - void SendEmptyArray() override; - - // TODO: Remove - void SendMGetResponse(SinkReplyBuilder::MGetResponse resp) override; + void SendEmptyArray(); }; } // namespace facade diff --git a/src/facade/reply_builder_test.cc b/src/facade/reply_builder_test.cc index 14e446332..dda3f5992 100644 --- a/src/facade/reply_builder_test.cc +++ b/src/facade/reply_builder_test.cc @@ -83,7 +83,7 @@ class RedisReplyBuilderTest : public testing::Test { void SetUp() { sink_.Clear(); - builder_.reset(new RedisReplyBuilder2(&sink_)); + builder_.reset(new RedisReplyBuilder(&sink_)); ResetStats(); } @@ -112,7 +112,7 @@ class RedisReplyBuilderTest : public testing::Test { } unsigned GetError(string_view err) const { - const auto& map = SinkReplyBuilder2::GetThreadLocalStats().err_count; + const auto& map = SinkReplyBuilder::GetThreadLocalStats().err_count; auto it = map.find(err); return it == map.end() ? 0 : it->second; } @@ -135,7 +135,7 @@ class RedisReplyBuilderTest : public testing::Test { ParsingResults Parse(); io::StringSink sink_; - std::unique_ptr builder_; + std::unique_ptr builder_; std::unique_ptr parser_buffer_; }; @@ -205,7 +205,7 @@ RedisReplyBuilderTest::ParsingResults RedisReplyBuilderTest::Parse() { TEST_F(RedisReplyBuilderTest, MessageSend) { // Test each message that is "sent" to the sink - builder_->SinkReplyBuilder2::SendOk(); + builder_->SinkReplyBuilder::SendOk(); ASSERT_EQ(TakePayload(), kOKMessage); builder_->StartArray(10); @@ -783,7 +783,7 @@ TEST_F(RedisReplyBuilderTest, BasicCapture) { string_view kTestSws[] = {"a1"sv, "a2"sv, "a3"sv, "a4"sv}; CapturingReplyBuilder crb{}; - using RRB = RedisReplyBuilder2; + using RRB = RedisReplyBuilder; auto big_arr_cb = [](RRB* r) { r->StartArray(4); @@ -870,12 +870,12 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) { std::string str = "A simple string!"; builder_->SetResp3(true); - builder_->SendVerbatimString(str, RedisReplyBuilder2::VerbatimFormat::TXT); + builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::TXT); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "=20\r\ntxt:A simple string!\r\n") << "Resp3 VerbatimString TXT failed."; builder_->SetResp3(true); - builder_->SendVerbatimString(str, RedisReplyBuilder2::VerbatimFormat::MARKDOWN); + builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::MARKDOWN); ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "=20\r\nmkd:A simple string!\r\n") << "Resp3 VerbatimString TXT failed."; diff --git a/src/facade/reply_capture.cc b/src/facade/reply_capture.cc index c799e6a81..02d00356e 100644 --- a/src/facade/reply_capture.cc +++ b/src/facade/reply_capture.cc @@ -8,7 +8,7 @@ #include "reply_capture.h" #define SKIP_LESS(needed) \ - has_replied_ = true; \ + replies_recorded_++; \ if (reply_mode_ < needed) { \ current_ = monostate{}; \ return; \ @@ -23,44 +23,11 @@ void CapturingReplyBuilder::SendError(std::string_view str, std::string_view typ Capture(Error{str, type}); } -void CapturingReplyBuilder::SendMGetResponse(MGetResponse resp) { - SKIP_LESS(ReplyMode::FULL); - Capture(std::move(resp)); -} - -void CapturingReplyBuilder::SendError(OpStatus status) { - if (status != OpStatus::OK) { - last_error_ = StatusToMsg(status); - } - SKIP_LESS(ReplyMode::ONLY_ERR); - Capture(status); -} - void CapturingReplyBuilder::SendNullArray() { SKIP_LESS(ReplyMode::FULL); Capture(unique_ptr{nullptr}); } -void CapturingReplyBuilder::SendEmptyArray() { - SKIP_LESS(ReplyMode::FULL); - Capture(make_unique(0, ARRAY)); -} - -void CapturingReplyBuilder::SendSimpleStrArr(StrSpan arr) { - SKIP_LESS(ReplyMode::FULL); - DCHECK_EQ(current_.index(), 0u); - - Capture(StrArrPayload{true, ARRAY, {arr.begin(), arr.end()}}); -} - -void CapturingReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) { - SKIP_LESS(ReplyMode::FULL); - DCHECK_EQ(current_.index(), 0u); - - // TODO: 1. Allocate all strings at once 2. Allow movable types - Capture(StrArrPayload{false, type, {arr.begin(), arr.end()}}); -} - void CapturingReplyBuilder::SendNull() { SKIP_LESS(ReplyMode::FULL); Capture(nullptr_t{}); @@ -86,13 +53,6 @@ void CapturingReplyBuilder::SendBulkString(std::string_view str) { Capture(BulkString{string{str}}); } -void CapturingReplyBuilder::SendScoredArray(absl::Span> arr, - bool with_scores) { - SKIP_LESS(ReplyMode::FULL); - std::vector> values(arr.begin(), arr.end()); - Capture(ScoredArray{std::move(values), with_scores}); -} - void CapturingReplyBuilder::StartCollection(unsigned len, CollectionType type) { SKIP_LESS(ReplyMode::FULL); stack_.emplace(make_unique(len, type), type == MAP ? len * 2 : len); @@ -109,8 +69,8 @@ CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() { } void CapturingReplyBuilder::SendDirect(Payload&& val) { - has_replied_ = !holds_alternative(val); - bool is_err = holds_alternative(val) || holds_alternative(val); + replies_recorded_ += !holds_alternative(val); + bool is_err = holds_alternative(val); ReplyMode min_mode = is_err ? ReplyMode::ONLY_ERR : ReplyMode::FULL; if (reply_mode_ >= min_mode) { DCHECK_EQ(current_.index(), 0u); @@ -178,13 +138,6 @@ struct CaptureVisitor { rb->SendError(status); } - void operator()(const CapturingReplyBuilder::StrArrPayload& sa) { - if (sa.simple) - rb->SendSimpleStrArr(sa.arr); - else - rb->SendStringArr(sa.arr, sa.type); - } - void operator()(const unique_ptr& cp) { if (!cp) { rb->SendNullArray(); @@ -199,14 +152,6 @@ struct CaptureVisitor { visit(*this, std::move(pl)); } - void operator()(SinkReplyBuilder::MGetResponse resp) { - rb->SendMGetResponse(std::move(resp)); - } - - void operator()(const CapturingReplyBuilder::ScoredArray& sarr) { - rb->SendScoredArray(sarr.arr, sarr.with_scores); - } - RedisReplyBuilder* rb; }; @@ -219,7 +164,7 @@ void CapturingReplyBuilder::Apply(Payload&& pl, RedisReplyBuilder* rb) { CaptureVisitor cv{rb}; visit(cv, std::move(pl)); // Consumed and printed by InvokeCmd. We just send the actual error here - std::ignore = rb->ConsumeLastError(); + rb->ConsumeLastError(); } void CapturingReplyBuilder::SetReplyMode(ReplyMode mode) { diff --git a/src/facade/reply_capture.h b/src/facade/reply_capture.h index 44eb708ce..1ffa4fd7c 100644 --- a/src/facade/reply_capture.h +++ b/src/facade/reply_capture.h @@ -23,47 +23,23 @@ class CapturingReplyBuilder : public RedisReplyBuilder { friend struct CaptureVisitor; public: - void SendError(std::string_view str, std::string_view type = {}) override; - void SendMGetResponse(MGetResponse resp) override; - - // SendStored -> SendSimpleString("OK") - // SendSetSkipped -> SendNull() - void SendError(OpStatus status) override; using RedisReplyBuilder::SendError; + void SendError(std::string_view str, std::string_view type) override; - void SendNullArray() override; - void SendEmptyArray() override; - void SendSimpleStrArr(StrSpan arr) override; - void SendStringArr(StrSpan arr, CollectionType type = ARRAY) override; - - void SendNull() override; void SendLong(long val) override; void SendDouble(double val) override; void SendSimpleString(std::string_view str) override; - void SendBulkString(std::string_view str) override; - void SendScoredArray(absl::Span> arr, - bool with_scores) override; void StartCollection(unsigned len, CollectionType type) override; + void SendNullArray() override; + void SendNull() override; public: using Error = std::pair; // SendError (msg, type) using Null = std::nullptr_t; // SendNull or SendNullArray - struct StrArrPayload { - bool simple; - CollectionType type; - std::vector arr; - }; - struct CollectionPayload; - - struct ScoredArray { - std::vector> arr; - bool with_scores; - }; - struct SimpleString : public std::string {}; // SendSimpleString struct BulkString : public std::string {}; // SendBulkString @@ -71,9 +47,8 @@ class CapturingReplyBuilder : public RedisReplyBuilder { : RedisReplyBuilder{nullptr}, reply_mode_{mode}, stack_{}, current_{} { } - using Payload = - std::variant, MGetResponse, ScoredArray>; + using Payload = std::variant>; // Non owned Error based on SendError arguments (msg, type) using ErrorRef = std::pair; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 7f3d8579c..e8c264c74 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -842,9 +842,9 @@ void DebugCmd::Watched(facade::SinkReplyBuilder* builder) { shard_set->RunBlockingInParallel(cb); rb->StartArray(4); rb->SendBulkString("awaked"); - rb->SendStringArr(awaked_trans); + rb->SendBulkStrArr(awaked_trans); rb->SendBulkString("watched"); - rb->SendStringArr(watched_keys); + rb->SendBulkStrArr(watched_keys); } void DebugCmd::TxAnalysis(facade::SinkReplyBuilder* builder) { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 390d70793..5cfe94c5f 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -978,7 +978,7 @@ void GenericFamily::Del(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil VLOG(1) << "Del " << ArgS(args, 0); atomic_uint32_t result{0}; - bool is_mc = (builder->type() == SinkReplyBuilder::MC); + bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE); auto cb = [&result](const Transaction* t, EngineShard* shard) { ShardArgs args = t->GetShardArgs(shard->shard_id()); @@ -1025,7 +1025,7 @@ void GenericFamily::Ping(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui } string_view resp[2] = {"pong", msg}; - return rb->SendStringArr(resp); + return rb->SendBulkStrArr(resp); } if (args.size() == 0) { diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 29f408904..82aca9f18 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -720,8 +720,8 @@ void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkRepl auto* rb = static_cast(builder); if (result) { bool is_map = (getall_mask == (VALUES | FIELDS)); - rb->SendStringArr(absl::Span{*result}, - is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY); + rb->SendBulkStrArr(absl::Span{*result}, + is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY); } else { builder->SendError(result.status()); } @@ -1220,7 +1220,7 @@ void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* if ((result->size() == 1) && (args.size() == 1)) rb->SendBulkString(result->front()); else - rb->SendStringArr(*result, facade::RedisReplyBuilder::ARRAY); + rb->SendBulkStrArr(*result, facade::RedisReplyBuilder::ARRAY); } else if (result.status() == OpStatus::KEY_NOTFOUND) { if (args.size() == 1) rb->SendNull(); diff --git a/src/server/http_api.cc b/src/server/http_api.cc index 88df84a38..8aa4c7b64 100644 --- a/src/server/http_api.cc +++ b/src/server/http_api.cc @@ -131,16 +131,6 @@ struct CaptureVisitor { absl::StrAppend(&str, "\"", facade::StatusToMsg(status), "\""); } - void operator()(const CapturingReplyBuilder::StrArrPayload& sa) { - absl::StrAppend(&str, "["); - for (const auto& val : sa.arr) { - absl::StrAppend(&str, JsonEscape(val), ","); - } - if (sa.arr.size()) - str.pop_back(); - absl::StrAppend(&str, "]"); - } - void operator()(unique_ptr cp) { if (!cp) { absl::StrAppend(&str, "null"); @@ -157,32 +147,6 @@ struct CaptureVisitor { } } - void operator()(facade::SinkReplyBuilder::MGetResponse resp) { - absl::StrAppend(&str, "["); - for (const auto& val : resp.resp_arr) { - if (val) { - absl::StrAppend(&str, JsonEscape(val->value), ","); - } else { - absl::StrAppend(&str, "null,"); - } - } - - if (resp.resp_arr.size()) - str.pop_back(); - absl::StrAppend(&str, "]"); - } - - void operator()(const CapturingReplyBuilder::ScoredArray& sarr) { - absl::StrAppend(&str, "["); - for (const auto& [key, score] : sarr.arr) { - absl::StrAppend(&str, "{", JsonEscape(key), ":", score, "},"); - } - if (sarr.arr.size() > 0) { - str.pop_back(); - } - absl::StrAppend(&str, "]"); - } - string str; }; diff --git a/src/server/json_family.cc b/src/server/json_family.cc index b96064576..21d5c4c90 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -204,7 +204,7 @@ template void Send(I begin, I end, RedisReplyBuilder* rb) { rb->SendEmptyArray(); } else { if constexpr (is_same_v) { - rb->SendStringArr(facade::OwnedArgSlice{begin, end}); + rb->SendBulkStrArr(facade::OwnedArgSlice{begin, end}); } else { rb->StartArray(end - begin); for (auto i = begin; i != end; ++i) { diff --git a/src/server/list_family.cc b/src/server/list_family.cc index a1c78e31e..435f1afba 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -987,7 +987,7 @@ void BPopGeneric(ListDir dir, CmdArgList args, Transaction* tx, SinkReplyBuilder if (popped_key) { DVLOG(1) << "BPop " << tx->DebugId() << " popped from key " << popped_key; // key. std::string_view str_arr[2] = {*popped_key, popped_value}; - return rb->SendStringArr(str_arr); + return rb->SendBulkStrArr(str_arr); } DVLOG(1) << "result for " << tx->DebugId() << " is " << popped_key.status(); @@ -1202,7 +1202,7 @@ void ListFamily::LRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil } auto* rb = static_cast(builder); - rb->SendStringArr(*res); + rb->SendBulkStrArr(*res); } // lrem key 5 foo, will remove foo elements from the list if exists at most 5 times. diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d87265a4e..cf765c627 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -286,24 +286,16 @@ class InterpreterReplier : public RedisReplyBuilder { } void SendError(std::string_view str, std::string_view type = std::string_view{}) final; - void SendStored() final; + void SendBulkString(std::string_view str) final; void SendSimpleString(std::string_view str) final; - void SendMGetResponse(MGetResponse resp) final; - void SendSimpleStrArr(StrSpan arr) final; + void SendNullArray() final; - - void SendStringArr(StrSpan arr, CollectionType type) final; void SendNull() final; - void SendLong(long val) final; void SendDouble(double val) final; - void SendBulkString(std::string_view str) final; - void StartCollection(unsigned len, CollectionType type) final; - void SendScoredArray(absl::Span> arr, - bool with_scores) final; private: void PostItem(); @@ -398,11 +390,6 @@ void InterpreterReplier::SendError(string_view str, std::string_view type) { explr_->OnError(str); } -void InterpreterReplier::SendStored() { - DCHECK(array_len_.empty()); - SendSimpleString("OK"); -} - void InterpreterReplier::SendSimpleString(string_view str) { if (array_len_.empty()) explr_->OnStatus(str); @@ -411,40 +398,11 @@ void InterpreterReplier::SendSimpleString(string_view str) { PostItem(); } -void InterpreterReplier::SendMGetResponse(MGetResponse resp) { - DCHECK(array_len_.empty()); - - explr_->OnArrayStart(resp.resp_arr.size()); - for (uint32_t i = 0; i < resp.resp_arr.size(); ++i) { - if (resp.resp_arr[i].has_value()) { - explr_->OnString(resp.resp_arr[i]->value); - } else { - explr_->OnNil(); - } - } - explr_->OnArrayEnd(); -} - -void InterpreterReplier::SendSimpleStrArr(StrSpan arr) { - explr_->OnArrayStart(arr.Size()); - for (string_view str : arr) - explr_->OnString(str); - explr_->OnArrayEnd(); -} - void InterpreterReplier::SendNullArray() { SendSimpleStrArr(ArgSlice{}); PostItem(); } -void InterpreterReplier::SendStringArr(StrSpan arr, CollectionType) { - explr_->OnArrayStart(arr.Size()); - for (string_view str : arr) - explr_->OnString(str); - explr_->OnArrayEnd(); - PostItem(); -} - void InterpreterReplier::SendNull() { explr_->OnNil(); PostItem(); @@ -465,7 +423,9 @@ void InterpreterReplier::SendBulkString(string_view str) { PostItem(); } -void InterpreterReplier::StartCollection(unsigned len, CollectionType) { +void InterpreterReplier::StartCollection(unsigned len, CollectionType type) { + if (type == MAP) + len *= 2; explr_->OnArrayStart(len); if (len == 0) { @@ -477,31 +437,6 @@ void InterpreterReplier::StartCollection(unsigned len, CollectionType) { } } -void InterpreterReplier::SendScoredArray(absl::Span> arr, - bool with_scores) { - if (with_scores) { - if (IsResp3()) { - StartCollection(arr.size(), CollectionType::ARRAY); - for (size_t i = 0; i < arr.size(); ++i) { - StartArray(2); - SendBulkString(arr[i].first); - SendDouble(arr[i].second); - } - } else { - StartCollection(arr.size() * 2, CollectionType::ARRAY); - for (size_t i = 0; i < arr.size(); ++i) { - SendBulkString(arr[i].first); - SendDouble(arr[i].second); - } - } - } else { - StartCollection(arr.size(), CollectionType::ARRAY); - for (size_t i = 0; i < arr.size(); ++i) { - SendBulkString(arr[i].first); - } - } -} - bool IsSHA(string_view str) { for (auto c : str) { if (!absl::ascii_isxdigit(c)) @@ -1189,10 +1124,10 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, facade::ConnectionContext* cntx) { - absl::Cleanup clear_last_error([builder]() { std::ignore = builder->ConsumeLastError(); }); DCHECK(!args.empty()); DCHECK_NE(0u, shard_set->size()) << "Init was not called"; + absl::Cleanup clear_last_error([builder]() { builder->ConsumeLastError(); }); ServerState& etl = *ServerState::tlocal(); string cmd = absl::AsciiStrToUpper(args[0]); @@ -1305,23 +1240,26 @@ class ReplyGuard { const bool is_script = bool(cntx->conn_state.script_info); const bool is_one_of = absl::flat_hash_set({"REPLCONF", "DFLY"}).contains(cid_name); - bool is_mcache = builder->type() == SinkReplyBuilder::MC; + bool is_mcache = builder->GetProtocol() == Protocol::MEMCACHE; const bool is_no_reply_memcache = is_mcache && (static_cast(builder)->NoReply() || cid_name == "QUIT"); const bool should_dcheck = !is_one_of && !is_script && !is_no_reply_memcache; if (should_dcheck) { builder_ = builder; - builder_->ExpectReply(); + replies_recorded_ = builder_->RepliesRecorded(); } } ~ReplyGuard() { if (builder_) { - DCHECK(builder_->HasReplied()); + DCHECK_GT(builder_->RepliesRecorded(), replies_recorded_) + << cid_name_ << " " << typeid(*builder_).name(); } } private: + size_t replies_recorded_ = 0; + std::string_view cid_name_; SinkReplyBuilder* builder_ = nullptr; }; @@ -1359,7 +1297,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui return true; } builder->SendError(std::move(*err)); - std::ignore = builder->ConsumeLastError(); + builder->ConsumeLastError(); return true; // return false only for internal error aborts } @@ -1402,9 +1340,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui return false; } - std::string reason = builder->ConsumeLastError(); - - if (!reason.empty()) { + if (std::string reason = builder->ConsumeLastError(); !reason.empty()) { VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason); LOG_EVERY_T(WARNING, 1) << FailedCommandToString(cid->name(), tail_args, reason); } @@ -1450,7 +1386,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply facade::ConnectionContext* cntx) { ConnectionContext* dfly_cntx = static_cast(cntx); DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning()); - DCHECK_EQ(builder->type(), SinkReplyBuilder::REDIS); + DCHECK_EQ(builder->GetProtocol(), Protocol::REDIS); vector stored_cmds; intrusive_ptr dist_trans; @@ -1701,7 +1637,7 @@ absl::flat_hash_map Service::UknownCmdMap() const { void Service::Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { - if (builder->type() == SinkReplyBuilder::REDIS) + if (builder->GetProtocol() == Protocol::REDIS) builder->SendOk(); using facade::SinkReplyBuilder; builder->CloseConnection(); @@ -2360,7 +2296,7 @@ void Service::Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* build void Service::PubsubChannels(string_view pattern, SinkReplyBuilder* builder) { auto* rb = static_cast(builder); - rb->SendStringArr(ServerState::tlocal()->channel_store()->ListChannels(pattern)); + rb->SendBulkStrArr(ServerState::tlocal()->channel_store()->ListChannels(pattern)); } void Service::PubsubPatterns(SinkReplyBuilder* builder) { diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index e62f3092e..d36baf1dd 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -121,7 +121,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor if (verify_commands_) { if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { rb->SendError(std::move(*err)); - std::ignore = rb->ConsumeLastError(); + rb->ConsumeLastError(); return !error_abort_; } } diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 969f97950..aeb46e69c 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -384,7 +384,7 @@ bool ProtocolClient::CheckRespFirstTypes(initializer_list types) } error_code ProtocolClient::SendCommand(string_view command) { - string formatted_command = RedisReplyBuilder2Base::SerializeCommand(command); + string formatted_command = RedisReplyBuilderBase::SerializeCommand(command); auto ec = sock_->Write(io::Buffer(formatted_command)); if (!ec) TouchIoTime(); diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index c710d03f5..8f41f33f9 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -712,7 +712,7 @@ void SearchFamily::FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu return OpStatus::OK; }); auto* rb = static_cast(builder); - rb->SendStringArr(names); + rb->SendBulkStrArr(names); } void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { @@ -920,7 +920,7 @@ void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* vector vec(result_set.begin(), result_set.end()); auto* rb = static_cast(builder); - rb->SendStringArr(vec, RedisReplyBuilder::SET); + rb->SendBulkStrArr(vec, RedisReplyBuilder::SET); } void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 83f9ea39e..39f1f7c88 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1968,7 +1968,7 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu } } auto* rb = static_cast(builder); - return rb->SendStringArr(res, RedisReplyBuilder::MAP); + return rb->SendBulkStrArr(res, RedisReplyBuilder::MAP); } if (sub_cmd == "RESETSTAT") { diff --git a/src/server/set_family.cc b/src/server/set_family.cc index e96a990ee..27a66a455 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -969,7 +969,7 @@ struct SetReplies { if (script) // output is sorted under scripts sort(sv->begin(), sv->end()); - rb->SendStringArr(*sv, RedisReplyBuilder::SET); + rb->SendBulkStrArr(*sv, RedisReplyBuilder::SET); } void Send(const ResultSetView& rsv) { @@ -1122,7 +1122,7 @@ void SPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { rb->SendBulkString(result.value().front()); } } else { // SPOP key cnt - rb->SendStringArr(*result, RedisReplyBuilder::SET); + rb->SendBulkStrArr(*result, RedisReplyBuilder::SET); } return; } @@ -1240,7 +1240,7 @@ void SRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { auto* rb = static_cast(builder); if (result || result == OpStatus::KEY_NOTFOUND) { if (is_count) { - rb->SendStringArr(*result, RedisReplyBuilder::SET); + rb->SendBulkStrArr(*result, RedisReplyBuilder::SET); } else if (result->size()) { rb->SendBulkString(result->front()); } else { diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 9ea089ab0..2799762f7 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -518,17 +518,32 @@ OpResult> OpThrottle(const OpArgs& op_args, const string_view return array{limited ? 1 : 0, limit, remaining, retry_after_ms, reset_after_ms}; } +struct GetResp { + string key; // TODO: to use backing storage to optimize this as well. + string_view value; + uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested). + uint32_t mc_flag = 0; +}; + +struct MGetResponse { + explicit MGetResponse(size_t size = 0) : resp_arr(size) { + } + + std::unique_ptr storage; + absl::InlinedVector, 2> resp_arr; +}; + // fetch_mask values constexpr uint8_t FETCH_MCFLAG = 0x1; constexpr uint8_t FETCH_MCVER = 0x2; -SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, - const Transaction* t, EngineShard* shard) { +MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Transaction* t, + EngineShard* shard) { ShardArgs keys = t->GetShardArgs(shard->shard_id()); DCHECK(!keys.Empty()); auto& db_slice = t->GetDbSlice(shard->shard_id()); - SinkReplyBuilder::MGetResponse response(keys.Size()); + MGetResponse response(keys.Size()); absl::InlinedVector iters(keys.Size()); // First, fetch all iterators and count total size ahead @@ -543,8 +558,8 @@ SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_ } // Allocate enough for all values - response.storage_list = SinkReplyBuilder::AllocMGetStorage(total_size); - char* next = response.storage_list->data; + response.storage = make_unique(total_size); + char* next = response.storage.get(); bool fetch_mcflag = fetch_mask & FETCH_MCFLAG; bool fetch_mcver = fetch_mask & FETCH_MCVER; for (size_t i = 0; i < iters.size(); ++i) { @@ -650,7 +665,7 @@ void ExtendGeneric(CmdArgList args, bool prepend, Transaction* tx, SinkReplyBuil string_view value = ArgS(args, 1); VLOG(2) << "ExtendGeneric(" << key << ", " << value << ")"; - if (builder->type() == SinkReplyBuilder::REDIS) { + if (builder->GetProtocol() == Protocol::REDIS) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpExtend(t->GetOpArgs(shard), key, value, prepend); }; @@ -662,7 +677,7 @@ void ExtendGeneric(CmdArgList args, bool prepend, Transaction* tx, SinkReplyBuil rb->SendLong(GetResult(std::move(res.value()))); } else { // Memcached skips if key is missing - DCHECK(builder->type() == SinkReplyBuilder::MC); + DCHECK(dynamic_cast(builder)); auto cb = [&](Transaction* t, EngineShard* shard) { return ExtendOrSkip(t->GetOpArgs(shard), key, value, prepend); @@ -723,7 +738,7 @@ void SetExGeneric(bool seconds, CmdArgList args, const CommandId* cid, Transacti } void IncrByGeneric(string_view key, int64_t val, Transaction* tx, SinkReplyBuilder* builder) { - bool skip_on_missing = builder->type() == SinkReplyBuilder::MC; + bool skip_on_missing = (builder->GetProtocol() == Protocol::MEMCACHE); auto cb = [&](Transaction* t, EngineShard* shard) { OpResult res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing); @@ -974,7 +989,7 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build // Remove existed key if the key is expired already if (rel_ms < 0) { - tx->ScheduleSingleHop([key](const Transaction* tx, EngineShard* es) { + tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) { ShardArgs args = tx->GetShardArgs(es->shard_id()); GenericFamily::OpDel(tx->GetOpArgs(es), args); return OpStatus::OK; @@ -1253,10 +1268,9 @@ void StringFamily::DecrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { DCHECK_GE(args.size(), 1U); - std::vector mget_resp(shard_set->size()); uint8_t fetch_mask = 0; - if (builder->type() == SinkReplyBuilder::MC) { + if (builder->GetProtocol() == Protocol::MEMCACHE) { fetch_mask |= FETCH_MCFLAG; if (cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER) fetch_mask |= FETCH_MCVER; @@ -1264,6 +1278,7 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil // Count of pending tiered reads util::fb2::BlockingCounter tiering_bc{0}; + std::vector mget_resp(shard_set->size()); auto cb = [&](Transaction* t, EngineShard* shard) { mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mask, t, shard); return OpStatus::OK; @@ -1275,18 +1290,14 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil // wait for all tiered reads to finish tiering_bc->Wait(); - // reorder the responses back according to the order of their corresponding - // keys. - SinkReplyBuilder::MGetResponse res(args.size()); + // reorder shard results back according to argument order + absl::FixedArray, 8> res(args.size()); for (ShardId sid = 0; sid < mget_resp.size(); ++sid) { if (!tx->IsActive(sid)) continue; - SinkReplyBuilder::MGetResponse& src = mget_resp[sid]; - src.storage_list->next = res.storage_list; - res.storage_list = src.storage_list; - src.storage_list = nullptr; + auto& src = mget_resp[sid]; ShardArgs shard_args = tx->GetShardArgs(sid); unsigned src_indx = 0; for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) { @@ -1295,14 +1306,32 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil uint32_t indx = it.index(); - res.resp_arr[indx] = std::move(src.resp_arr[src_indx]); - if (builder->type() == SinkReplyBuilder::MC) { - res.resp_arr[indx]->key = *it; + res[indx] = std::move(src.resp_arr[src_indx]); + if (builder->GetProtocol() == Protocol::MEMCACHE) { + res[indx]->key = *it; } } } - return builder->SendMGetResponse(std::move(res)); + SinkReplyBuilder::ReplyScope scope(builder); + if (builder->GetProtocol() == Protocol::MEMCACHE) { + auto* rb = static_cast(builder); + for (const auto& entry : res) { + if (!entry) + continue; + rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag); + } + rb->SendSimpleString("END"); + } else { + auto* rb = static_cast(builder); + rb->StartArray(res.size()); + for (const auto& entry : res) { + if (entry) + rb->SendBulkString(entry->value); + else + rb->SendNull(); + } + } } void StringFamily::MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,