diff --git a/src/brpc/controller.cpp b/src/brpc/controller.cpp index 15c8c91887..2e155d5e28 100644 --- a/src/brpc/controller.cpp +++ b/src/brpc/controller.cpp @@ -297,6 +297,9 @@ void Controller::ResetPods() { _request_streams.clear(); _response_streams.clear(); _remote_stream_settings = NULL; + set_bind_sock_action(BIND_SOCK_NONE); + _bind_sock.reset(); + _session_data = NULL; _auth_flags = 0; _rpc_received_us = 0; } @@ -308,6 +311,12 @@ Controller::Call::Call(Controller::Call* rhs) , peer_id(rhs->peer_id) , begin_time_us(rhs->begin_time_us) , sending_sock(rhs->sending_sock.release()) + // A backup/retry call must behave normally w.r.t. socket disposal; it never + // inherits the originating call's reserve/use affinity. Leaving this + // uninitialized lets OnComplete read indeterminate bits and (when they + // happen to match RESERVE/USE) hijack the socket away from the pool-return + // path, hanging the RPC. Initialize explicitly, matching Reset(). + , bind_sock_action(BIND_SOCK_NONE) , stream_user_data(rhs->stream_user_data) { // NOTE: fields in rhs should be reset because RPC could fail before // setting all the fields to next call and _current_call.OnComplete @@ -328,6 +337,7 @@ void Controller::Call::Reset() { peer_id = INVALID_SOCKET_ID; begin_time_us = 0; sending_sock.reset(NULL); + bind_sock_action = BIND_SOCK_NONE; stream_user_data = NULL; } @@ -824,7 +834,13 @@ void Controller::Call::OnComplete( // assumption that one pooled connection cannot have more than one // message at the same time. if (sending_sock != NULL && (error_code == 0 || responded)) { - if (!sending_sock->is_read_progressive()) { + if (bind_sock_action == BIND_SOCK_RESERVE) { + // Reserve this socket on the controller for a following RPC + // (used by mysql transactions for connection affinity). + c->_bind_sock.reset(sending_sock.release()); + } else if (bind_sock_action == BIND_SOCK_USE) { + // Socket is owned by the binder; do not return it to the pool. + } else if (!sending_sock->is_read_progressive()) { // Normally-read socket which will not be used after RPC ends, // safe to return. Notice that Socket::is_read_progressive may // differ from Controller::is_response_read_progressively() @@ -841,7 +857,11 @@ void Controller::Call::OnComplete( case CONNECTION_TYPE_SHORT: if (sending_sock != NULL) { // Check the comment in CONNECTION_TYPE_POOLED branch. - if (!sending_sock->is_read_progressive()) { + if (bind_sock_action == BIND_SOCK_RESERVE) { + c->_bind_sock.reset(sending_sock.release()); + } else if (bind_sock_action == BIND_SOCK_USE) { + // Socket is owned by the binder; do not fail it. + } else if (!sending_sock->is_read_progressive()) { if (c->_stream_creator == NULL) { sending_sock->SetFailed(); } @@ -908,6 +928,9 @@ void Controller::EndRPC(const CompletionInfo& info) { } // TODO: Replace this with stream_creator. HandleStreamConnection(_current_call.sending_sock.get()); + // Propagate the reserve action; OnComplete only actually reserves the + // socket when the RPC succeeded (its error_code==0 || responded guard). + _current_call.bind_sock_action = bind_sock_action(); _current_call.OnComplete(this, _error_code, info.responded, true); } else { // Even if _unfinished_call succeeded, we don't use EBACKUPREQUEST @@ -1092,7 +1115,19 @@ void Controller::IssueRPC(int64_t start_realtime_us) { _current_call.need_feedback = false; _current_call.enable_circuit_breaker = has_enabled_circuit_breaker(); SocketUniquePtr tmp_sock; - if (SingleServer()) { + if ((_connection_type & CONNECTION_TYPE_POOLED_AND_SHORT) && + bind_sock_action() == BIND_SOCK_USE) { + // Reuse the socket reserved by a previous RPC (mysql transaction affinity). + tmp_sock.reset(_bind_sock.release()); + if (!tmp_sock || (!is_health_check_call() && !tmp_sock->IsAvailable())) { + // NOTE: tmp_sock may be NULL here, so guard the id() deref. + SetFailed(EHOSTDOWN, "Not connected to bind socket yet, server_id=%" PRIu64, + tmp_sock ? tmp_sock->id() : (SocketId)0); + tmp_sock.reset(); // Release ref ASAP + return HandleSendFailed(); + } + _current_call.peer_id = tmp_sock->id(); + } else if (SingleServer()) { // Don't use _current_call.peer_id which is set to -1 after construction // of the backup call. const int rc = Socket::Address(_single_server_id, &tmp_sock); @@ -1157,7 +1192,10 @@ void Controller::IssueRPC(int64_t start_realtime_us) { _current_call.sending_sock->set_preferred_index(_preferred_index); } else { int rc = 0; - if (_connection_type == CONNECTION_TYPE_POOLED) { + if (bind_sock_action() == BIND_SOCK_USE) { + // Already holding the reserved socket; use it directly. + _current_call.sending_sock.reset(tmp_sock.release()); + } else if (_connection_type == CONNECTION_TYPE_POOLED) { rc = tmp_sock->GetPooledSocket(&_current_call.sending_sock); } else if (_connection_type == CONNECTION_TYPE_SHORT) { rc = tmp_sock->GetShortSocket(&_current_call.sending_sock); @@ -1179,7 +1217,8 @@ void Controller::IssueRPC(int64_t start_realtime_us) { _current_call.sending_sock->set_preferred_index(_preferred_index); // Set preferred_index of main_socket as well to make it easier to // debug and observe from /connections. - if (tmp_sock->preferred_index() < 0) { + // NOTE: tmp_sock is NULL on the BIND_SOCK_USE path (released above). + if (tmp_sock && tmp_sock->preferred_index() < 0) { tmp_sock->set_preferred_index(_preferred_index); } tmp_sock.reset(); diff --git a/src/brpc/controller.h b/src/brpc/controller.h index 45f71b72f6..6f00acab80 100644 --- a/src/brpc/controller.h +++ b/src/brpc/controller.h @@ -107,6 +107,15 @@ enum StopStyle { const int32_t UNSET_MAGIC_NUM = -123456789; +// If a controller wants to reserve the sending socket after the RPC (used by +// mysql transactions for connection affinity), set BIND_SOCK_RESERVE; later RPCs +// reuse it via BIND_SOCK_USE. +enum BindSockAction { + BIND_SOCK_RESERVE, + BIND_SOCK_USE, + BIND_SOCK_NONE, +}; + typedef butil::FlatMap UserFieldsMap; // A Controller mediates a single method call. The primary purpose of @@ -145,6 +154,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11); static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12); static const uint32_t FLAGS_USED_BY_RPC = (1 << 13); + // Reserve/reuse the sending socket after the RPC (mysql transactions). + // The two bits encode BindSockAction: neither=BIND_SOCK_NONE, + // RESERVE bit=BIND_SOCK_RESERVE, USE bit=BIND_SOCK_USE. + static const uint32_t FLAGS_BIND_SOCK_RESERVE = (1 << 14); + static const uint32_t FLAGS_BIND_SOCK_USE = (1 << 15); static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16); static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17); static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18); @@ -762,6 +776,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary // socket fetched from socket pool SocketUniquePtr sending_sock; + BindSockAction bind_sock_action; StreamUserData* stream_user_data; }; @@ -789,6 +804,26 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); { return t ? add_flag(f) : clear_flag(f); } inline bool has_flag(uint32_t f) const { return _flags & f; } + // BindSockAction stored in the FLAGS_BIND_SOCK_* bits of _flags instead + // of a dedicated member. + void set_bind_sock_action(BindSockAction action) { + clear_flag(FLAGS_BIND_SOCK_RESERVE | FLAGS_BIND_SOCK_USE); + if (action == BIND_SOCK_RESERVE) { + add_flag(FLAGS_BIND_SOCK_RESERVE); + } else if (action == BIND_SOCK_USE) { + add_flag(FLAGS_BIND_SOCK_USE); + } + } + BindSockAction bind_sock_action() const { + if (has_flag(FLAGS_BIND_SOCK_RESERVE)) { + return BIND_SOCK_RESERVE; + } + if (has_flag(FLAGS_BIND_SOCK_USE)) { + return BIND_SOCK_USE; + } + return BIND_SOCK_NONE; + } + void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); } bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); } @@ -915,6 +950,16 @@ friend void policy::ProcessThriftRequest(InputMessageBase*); // Defined at both sides StreamSettings *_remote_stream_settings; + // Whether/how to reserve the sending socket after the RPC (mysql + // transactions) is stored in the FLAGS_BIND_SOCK_* bits of _flags; see + // set_bind_sock_action()/bind_sock_action(). The socket reserved by a + // previous RPC and reused when the action is BIND_SOCK_USE: + SocketUniquePtr _bind_sock; + // Opaque per-RPC slot a protocol codec may use to carry typed state from + // serialize_request to pack_request/parse (e.g. the mysql prepared-statement + // stub). Not owned by Controller. + void* _session_data; + // Thrift method name, only used when thrift protocol enabled std::string _thrift_method_name; diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp index 7228a0edf0..b1dc92e37b 100644 --- a/src/brpc/socket.cpp +++ b/src/brpc/socket.cpp @@ -461,6 +461,7 @@ Socket::Socket(Forbidden f) , _fd(-1) , _tos(0) , _reset_fd_real_us(-1) + , _fd_version(0) , _on_edge_triggered_events(NULL) , _need_on_edge_trigger(false) , _user(NULL) @@ -578,6 +579,9 @@ int Socket::ResetFileDescriptor(int fd) { _avg_msg_size = 0; // MUST store `_fd' before adding itself into epoll device to avoid // race conditions with the callback function inside epoll + static butil::atomic BAIDU_CACHELINE_ALIGNMENT fd_version(0); + _fd_version.store(fd_version.fetch_add(1, butil::memory_order_relaxed), + butil::memory_order_relaxed); _fd.store(fd, butil::memory_order_release); _reset_fd_real_us = butil::cpuwide_time_us(); if (!ValidFileDescriptor(fd)) { @@ -1618,7 +1622,10 @@ int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) { if (options_in) { opt = *options_in; } - if (data->empty()) { + // An auth write (opt.auth_flags != 0) may carry an empty data buffer: some + // protocols (e.g. mysql) read the server greeting first and send their real + // bytes from the connection-phase handler, not from `data` here. + if (data->empty() && !opt.auth_flags) { return SetError(opt.id_wait, EINVAL); } if (opt.pipelined_count > MAX_PIPELINED_COUNT) { diff --git a/src/brpc/socket.h b/src/brpc/socket.h index 7311d73895..680361059e 100644 --- a/src/brpc/socket.h +++ b/src/brpc/socket.h @@ -430,6 +430,9 @@ friend class TransportFactory; // The file descriptor int fd() const { return _fd.load(butil::memory_order_relaxed); } + // The file descriptor version, used to avoid ABA problem. + uint64_t fd_version() const { return _fd_version.load(butil::memory_order_relaxed); } + // ip/port of the local end of the connection butil::EndPoint local_side() const { return _local_side; } @@ -840,6 +843,9 @@ friend class TransportFactory; butil::atomic _fd; // -1 when not connected. int _tos; // Type of service which is actually only 8bits. int64_t _reset_fd_real_us; // When _fd was reset, in microseconds. + // ABA/version counter; written on fd reset and read via fd_version() from + // other threads, so use relaxed atomics to avoid a data race. + butil::atomic _fd_version; // _fd_version, used only for mysql now. // Address of peer. Initialized by SocketOptions.remote_side. butil::EndPoint _remote_side;