diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 863443f91..ef22194f6 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -528,7 +528,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } if (ec && !FiberSocketBase::IsConnClosed(ec)) { - LOG(WARNING) << "Socket error " << ec << " " << ec.message(); + string conn_info = service_->GetContextInfo(cc_.get()); + LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec + << " " << ec.message(); } --stats_->num_conns; diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 83b8c431e..5ddb8db9e 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -130,9 +130,10 @@ struct ConnectionState { // For set op - it's the flag value we are storing along with the value. // For get op - we use it as a mask of MCGetMask values. uint32_t memcache_flag = 0; + bool is_blocking = false; // whether this connection is blocking on a command ExecInfo exec_info; - ReplicationInfo replicaiton_info; + ReplicationInfo replication_info; std::unique_ptr script_info; std::unique_ptr subscribe_info; @@ -173,12 +174,16 @@ class ConnectionContext : public facade::ConnectionContext { void ChangeMonitor(bool start); // either start or stop monitor on a given connection // Whether this connection is a connection from a replica to its master. + // This flag is true only on replica side, where we need to setup a special ConnectionContext + // instance that helps applying commands coming from master. bool is_replicating = false; - // Reference to a FlowInfo for this connection if from a master to a replica. - FlowInfo* replication_flow; + bool monitor = false; // when a monitor command is sent over a given connection, we need to aware // of it as a state for the connection + // Reference to a FlowInfo for this connection if from a master to a replica. + FlowInfo* replication_flow; + private: void EnableMonitoring(bool enable) { subscriptions++; // required to support the monitoring diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index ce2b334d4..9577db33a 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -256,9 +256,8 @@ OpResult FindFirstNonEmptyKey(Transaction* trans, int req_obj_typ } // If OK is returned then cb was called on the first non empty key and `out_key` is set to the key. -facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& func, std::string* out_key, - Transaction* trans, int req_obj_type, - unsigned limit_ms) { +OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, + BlockingResultCb func, unsigned limit_ms) { auto limit_tp = limit_ms ? std::chrono::steady_clock::now() + std::chrono::milliseconds(limit_ms) : Transaction::time_point::max(); bool is_multi = trans->IsMulti(); @@ -296,19 +295,20 @@ facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& func, std::stri return result.status(); } - auto cb = [&func, &ff_result, out_key](Transaction* t, EngineShard* shard) { + string result_key; + auto cb = [&](Transaction* t, EngineShard* shard) { if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) { - *out_key = *wake_key; - func(t, shard, *out_key); + result_key = *wake_key; + func(t, shard, result_key); } else if (shard->shard_id() == ff_result.sid) { - ff_result.key.GetString(out_key); - func(t, shard, *out_key); + ff_result.key.GetString(&result_key); + func(t, shard, result_key); } return OpStatus::OK; }; trans->Execute(std::move(cb), true); - return OpStatus::OK; + return result_key; } } // namespace dfly::container_utils diff --git a/src/server/container_utils.h b/src/server/container_utils.h index 917d5f13a..87e19226f 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -89,9 +89,8 @@ struct ShardFFResult { OpResult FindFirstNonEmptyKey(Transaction* trans, int req_obj_type); using BlockingResultCb = std::function; -facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& cb, std::string* out_key, - Transaction* trans, int req_obj_type, - unsigned limit_ms); +OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, + BlockingResultCb cb, unsigned limit_ms); }; // namespace container_utils diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0b02dee23..30cc0bc17 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -244,8 +244,8 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { // Set meta info on connection. cntx->owner()->SetName(absl::StrCat("repl_flow_", sync_id)); - cntx->conn_state.replicaiton_info.repl_session_id = sync_id; - cntx->conn_state.replicaiton_info.repl_flow_id = flow_id; + cntx->conn_state.replication_info.repl_session_id = sync_id; + cntx->conn_state.replication_info.repl_flow_id = flow_id; absl::InsecureBitGen gen; string eof_token = GetRandomHex(gen, 40); @@ -471,7 +471,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx) }; string address = cntx->owner()->RemoteEndpointAddress(); - uint32_t port = cntx->conn_state.replicaiton_info.repl_listening_port; + uint32_t port = cntx->conn_state.replication_info.repl_listening_port; LOG(INFO) << "Registered replica " << address << ":" << port; @@ -484,7 +484,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx) } void DflyCmd::OnClose(ConnectionContext* cntx) { - unsigned session_id = cntx->conn_state.replicaiton_info.repl_session_id; + unsigned session_id = cntx->conn_state.replication_info.repl_session_id; if (!session_id) return; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index d3dd8eb4e..738e7875e 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -1200,29 +1200,30 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn absl::StrAppend(debugMessages.Next(), "BPopGeneric by ", transaction->DebugId()); - std::string popped_key, popped_value; + std::string popped_value; auto cb = [dir, &popped_value](Transaction* t, EngineShard* shard, std::string_view key) { popped_value = OpBPop(t, shard, key, dir); }; - OpStatus result = container_utils::RunCbOnFirstNonEmptyBlocking( - cb, &popped_key, transaction, OBJ_LIST, unsigned(timeout * 1000)); - - if (result == OpStatus::OK) { + cntx->conn_state.is_blocking = true; + OpResult popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( + transaction, OBJ_LIST, move(cb), unsigned(timeout * 1000)); + cntx->conn_state.is_blocking = false; + if (popped_key) { DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << popped_key; // key. - std::string_view str_arr[2] = {popped_key, popped_value}; + std::string_view str_arr[2] = {*popped_key, popped_value}; return (*cntx)->SendStringArr(str_arr); } - DVLOG(1) << "result for " << transaction->DebugId() << " is " << result; + DVLOG(1) << "result for " << transaction->DebugId() << " is " << popped_key.status(); - switch (result) { + switch (popped_key.status()) { case OpStatus::WRONG_TYPE: return (*cntx)->SendError(kWrongTypeErr); case OpStatus::TIMED_OUT: return (*cntx)->SendNullArray(); default: - LOG(ERROR) << "Unexpected error " << result; + LOG(ERROR) << "Unexpected error " << popped_key.status(); } return (*cntx)->SendNullArray(); } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 41aa13d7f..5f12a2be8 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1844,12 +1844,20 @@ void Service::OnClose(facade::ConnectionContext* cntx) { string Service::GetContextInfo(facade::ConnectionContext* cntx) { char buf[16] = {0}; unsigned index = 0; - if (cntx->async_dispatch) + ConnectionContext* server_cntx = static_cast(cntx); + + if (server_cntx->async_dispatch) buf[index++] = 'a'; - if (cntx->conn_closing) + if (server_cntx->conn_closing) buf[index++] = 't'; + if (server_cntx->conn_state.subscribe_info) + buf[index++] = 'P'; + + if (server_cntx->conn_state.is_blocking) + buf[index++] = 'b'; + return index ? absl::StrCat("flags:", buf) : string(); } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 008c7e4f2..d3aa164dd 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1916,7 +1916,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid)); string sync_id = absl::StrCat("SYNC", sid); - cntx->conn_state.replicaiton_info.repl_session_id = sid; + cntx->conn_state.replication_info.repl_session_id = sid; if (!cntx->replica_conn) { ServerState::tl_connection_stats()->num_replicas += 1; @@ -1937,7 +1937,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendError(kInvalidIntErr); return; } - cntx->conn_state.replicaiton_info.repl_listening_port = replica_listening_port; + cntx->conn_state.replication_info.repl_listening_port = replica_listening_port; } else if (cmd == "CLIENT-ID" && args.size() == 2) { std::string client_id{arg}; auto& pool = service_.proactor_pool(); @@ -1949,8 +1949,8 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendError(kInvalidIntErr); } VLOG(1) << "Client version for session_id=" - << cntx->conn_state.replicaiton_info.repl_session_id << " is " << version; - cntx->conn_state.replicaiton_info.repl_version = DflyVersion(version); + << cntx->conn_state.replication_info.repl_session_id << " is " << version; + cntx->conn_state.replication_info.repl_version = DflyVersion(version); } else if (cmd == "ACK" && args.size() == 2) { // Don't send error/Ok back through the socket, because we don't want to interleave with // the journal writes that we write into the same socket. diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 8ac369452..51197cf5d 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1171,32 +1171,34 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) { VLOG(1) << "BZPop timeout(" << timeout << ")"; Transaction* transaction = cntx->transaction; - std::string popped_key; OpResult popped_array; - OpStatus result = container_utils::RunCbOnFirstNonEmptyBlocking( + cntx->conn_state.is_blocking = true; + OpResult popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( + transaction, OBJ_ZSET, [is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) { popped_array = OpBZPop(t, shard, key, is_max); }, - &popped_key, transaction, OBJ_ZSET, unsigned(timeout * 1000)); + unsigned(timeout * 1000)); + cntx->conn_state.is_blocking = false; - if (result == OpStatus::OK) { + if (popped_key) { DVLOG(1) << "BZPop " << transaction->DebugId() << " popped from key " << popped_key; // key. CHECK(popped_array->size() == 1); (*cntx)->StartArray(3); - (*cntx)->SendBulkString(popped_key); + (*cntx)->SendBulkString(*popped_key); (*cntx)->SendBulkString(popped_array->front().first); return (*cntx)->SendDouble(popped_array->front().second); } - DVLOG(1) << "result for " << transaction->DebugId() << " is " << result; + DVLOG(1) << "result for " << transaction->DebugId() << " is " << popped_key.status(); - switch (result) { + switch (popped_key.status()) { case OpStatus::WRONG_TYPE: return (*cntx)->SendError(kWrongTypeErr); case OpStatus::TIMED_OUT: return (*cntx)->SendNullArray(); default: - LOG(ERROR) << "Unexpected error " << result; + LOG(ERROR) << "Unexpected error " << popped_key.status(); } return (*cntx)->SendNullArray(); }