feat: add io_uring transport layer support#3328
Conversation
0e65397 to
89b2765
Compare
|
|
||
| if (!IsWritable()) { errno = EAGAIN; return -1; } | ||
|
|
||
| const int fd = _socket->fd(); |
There was a problem hiding this comment.
- 这里 fd 还是 tcp 的 socket fd,不是 ring_fd 是吗?
- 这里 iouring 就仅仅使用了数据的读写的 API ?
There was a problem hiding this comment.
_inflight_writes (per-endpoint) ← 保护单条连接不无限堆积未完成写
SQ 容量(256 个槽) ← 保护 ring 本身不溢出
89b2765 to
d737faf
Compare
d737faf to
07c041d
Compare
There was a problem hiding this comment.
Pull request overview
This PR adds an io_uring-backed transport (IouringTransport) to bRPC as an alternative to the existing TCP/RDMA transports, including a per-bthread-tag poller architecture, optional fixed-buffer (READ/WRITE_FIXED) support, and build-system integration for both CMake and Bazel. It also includes an io_uring example and Chinese documentation.
Changes:
- Introduce io_uring core implementation (endpoint, helper, transport, buffer pools) and register it in the transport factory / socket mode.
- Add build toggles and dependency wiring for liburing (CMake + Bazel), plus an io_uring echo example and docs.
- Minor compatibility adjustments (e.g.,
BLOCK_SIZErename) to avoid macro collisions when including liburing headers.
Reviewed changes
Copilot reviewed 25 out of 25 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| WORKSPACE | Adds Bazel liburing repository definition (currently via hard-coded local path). |
| src/butil/single_threaded_pool.h | Renames a static constant to avoid BLOCK_SIZE macro conflicts. |
| src/bthread/types.h | Adjusts minimum bthread concurrency when RDMA/io_uring is compiled in. |
| src/brpc/transport_factory.cpp | Registers IouringTransport in the factory. |
| src/brpc/socket.h | Adds friend declarations for io_uring endpoint/transport. |
| src/brpc/socket_mode.h | Adds SOCKET_MODE_IOURING. |
| src/brpc/iouring/iouring_helper.h | Declares io_uring lifecycle APIs, polling modes, and poller handle. |
| src/brpc/iouring/iouring_helper.cpp | Implements global io_uring init, probing, and poller hook wiring. |
| src/brpc/iouring/iouring_endpoint.h | Defines IouringEndpoint and poller structures. |
| src/brpc/iouring/iouring_endpoint.cpp | Implements SQE submission, CQE polling, and per-tag poller loop. |
| src/brpc/iouring/iouring_block_pool.h | Declares fixed-buffer mempool + per-ring read-slot pool. |
| src/brpc/iouring/iouring_block_pool.cpp | Implements fixed-buffer mempool and read-slot pool. |
| src/brpc/iouring_transport.h | Declares IouringTransport. |
| src/brpc/iouring_transport.cpp | Implements IouringTransport read/write/wait logic and global init hook. |
| src/brpc/input_messenger.h | Adds friend declarations for io_uring integration. |
| example/iouring_echo_c++/server.cpp | Adds an io_uring echo server example with flags and initialization. |
| example/iouring_echo_c++/client.cpp | Adds a simple client for the io_uring echo server example. |
| example/iouring_echo_c++/echo.proto | Proto definitions for the echo example. |
| example/iouring_echo_c++/CMakeLists.txt | CMake wiring for building the io_uring echo example. |
| example/BUILD.bazel | Adds Bazel targets/defines for io_uring example(s). |
| docs/cn/iouring.md | Adds Chinese documentation for io_uring build/flags/architecture. |
| CMakeLists.txt | Adds WITH_IOURING option and liburing discovery/linking. |
| BUILD.bazel | Adds Bazel selects/deps/src handling for io_uring sources and liburing. |
| bazel/third_party/liburing/liburing.BUILD | Adds Bazel cc_library target for liburing sources/headers. |
| bazel/config/BUILD.bazel | Adds brpc_with_iouring config_setting. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # io_uring support via local liburing source tree. | ||
| # Enable with: --define=BRPC_WITH_IOURING=true | ||
| # Requires liburing checked out at /docker/root/projects/liburing | ||
| new_local_repository( | ||
| name = "com_github_axboe_liburing", | ||
| path = "/docker/root/projects/liburing", | ||
| build_file = "//bazel/third_party/liburing:liburing.BUILD", | ||
| ) |
| if(WITH_IOURING) | ||
| list(APPEND DYNAMIC_LIB ${IOURING_LIB}) | ||
| set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -luring") | ||
| endif() | ||
|
|
||
| set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz") |
| if (mode == IouringPollingMode::NONE) { | ||
| // Interrupt-driven mode: block up to 1 ms waiting for a CQE. | ||
| // The 1 ms timeout keeps the Poller loop responsive to new | ||
| // connections arriving in op_queue without burning CPU when | ||
| // there is no I/O traffic. | ||
| struct io_uring_cqe* cqe = nullptr; | ||
| struct __kernel_timespec ts{0, 1000000}; // 1 ms | ||
| int r = io_uring_wait_cqe_timeout(&poller->ring, &cqe, &ts); | ||
| if (r == 0 && cqe) { | ||
| io_uring_cqe_seen(&poller->ring, cqe); | ||
| } | ||
|
|
| // Only process CQEs that bRPC submitted (bit 63 == 1). | ||
| // All other CQEs are user-submitted; leave them in the ring so the | ||
| // user callback (called right after PollCq returns) can | ||
| // drain and handle them. Users need no special tagging – bit 63 | ||
| // is never set in a canonical user-space pointer or small integer. | ||
| if (!(udata & kBrpcCqeTag)) { | ||
| continue; // do NOT call io_uring_cqe_seen() | ||
| } |
| static const int kMaxTlsCacheNum = 4096; | ||
|
|
||
| IouringMemPool& IouringMemPool::Instance() { | ||
| static IouringMemPool inst; | ||
| return inst; | ||
| } | ||
|
|
||
| // --------------------------------------------------------------------------- | ||
| // Init | ||
| // --------------------------------------------------------------------------- | ||
| bool IouringMemPool::Init(size_t block_size) { | ||
| if (initialized_) { | ||
| LOG(WARNING) << "IouringMemPool already initialized"; | ||
| return true; |
| "butil::SetDefaultBlockSize() is called with this value at " | ||
| "startup so IOBuf and the registered slab are always in sync."); | ||
|
|
||
| DEFINE_int32(iouring_read_slot_num, 256, | ||
| "Initial read slots per Poller ring."); | ||
|
|
||
| DEFINE_int32(iouring_read_slot_max, 4096, |
| IouringReadSlot consumed_slot = ep->_read_slot; | ||
| ep->_read_slot = {}; | ||
|
|
||
| // Already on the Poller thread – no lock needed. | ||
| // Acquire the next slot before handing off consumed_slot so | ||
| // that back-to-back arrivals never stall waiting for a slot. | ||
| { | ||
| Poller* p = ep->GetPoller(); | ||
| if (p && p->slot_pool.initialized()) { | ||
| p->slot_pool.Acquire(&ep->_read_slot); | ||
| } | ||
| } | ||
|
|
||
| // Zero-copy: wrap slot memory – IouringMemPool is | ||
| // thread-safe so the destructor can run on any thread. | ||
| struct SlotDeleter { | ||
| static void destroy(void* ptr) { | ||
| IouringMemPool::Instance().Deallocate(ptr); | ||
| } | ||
| }; | ||
| butil::IOBuf tmp; | ||
| tmp.append_user_data(consumed_slot.buf, | ||
| static_cast<size_t>(res), | ||
| SlotDeleter::destroy); | ||
| m->_read_buf.append(std::move(tmp)); |
| LOG(WARNING) | ||
| << "io_uring_register_buffers failed for new " | ||
| "region (buf_index_base=" << buf_index_base | ||
| << "): " << berror(-r2) | ||
| << " – WRITE_FIXED will fall back to WRITEV."; | ||
| } |
| }) + select({ | ||
| "//bazel/config:brpc_with_iouring": glob([ | ||
| "src/brpc/iouring/*.cpp", | ||
| ]), | ||
| "//conditions:default": [], |
f6b9ad4 to
22db965
Compare
|
LGTM |
| srcs = glob(["**"]), | ||
| ) | ||
|
|
||
| configure_make( |
There was a problem hiding this comment.
The liburing BUILD.bazel implementation in the bazel central registry is better.
|
At least in bazel CI (e.g., gcc-compile-with-bazel-all-options and clang-compile-with-bazel-all-options), you must ensure that io_uring compiles successfully. brpc/.github/workflows/ci-linux.yml Lines 124 to 139 in 72adb6e brpc/.github/workflows/ci-linux.yml Lines 183 to 199 in 72adb6e |
|
Could you add some unit tests for io_uring transport? |
b8cf0be to
d0ea241
Compare
|
请问为什么选择在transport层新增iouring而不是event_dispatcher下的epoll统计新增iouring,改动面更小,对brpc上层更透明 |
iouring是poll的模式,没办法实现成event_dispatcher同样的语义。bRPC里面的很多网络的读写都是read/write,这些也是阻塞式的语义,没办法替换的。 |
e2ca6ad to
0e0b3c1
Compare
Add a new io_uring-based transport layer (IouringTransport) as an
alternative to the existing TCP and RDMA transports, following the
same architectural patterns as the RDMA implementation.
Core implementation:
- src/brpc/iouring/iouring_endpoint.h/cpp: IouringEndpoint (SocketUser
subclass) that submits async read/write SQEs and reaps CQEs, with
optional SQPOLL polling mode.
- src/brpc/iouring/iouring_helper.h/cpp: global io_uring ring lifecycle
management, per-bthread-tag poller threads, and availability checks.
- src/brpc/iouring_transport.h/cpp: IouringTransport (Transport
interface) wiring Init/Release/Reset/Connect/CutFromIOBuf(List)/
WaitEpollOut/ProcessEvent/QueueMessage/Debug/ContextInitOrDie.
Build system integration:
- CMakeLists.txt: BRPC_WITH_IOURING option; find_package(liburing);
conditionally compile iouring sources and link -luring.
- BUILD.bazel / bazel/config/BUILD.bazel: brpc_with_iouring
config_setting; conditional srcs/defines/linkopts/deps.
- WORKSPACE: new_local_repository for @com_github_axboe_liburing.
- bazel/third_party/liburing/liburing.BUILD: cc_library target for
liburing.
Framework hooks:
- src/brpc/socket_mode.h: add SOCKET_MODE_IOURING enum value.
- src/brpc/transport_factory.cpp: register IouringTransport in
TransportFactory::Create().
- src/brpc/socket.h: friend declarations for IouringEndpoint /
IouringTransport.
- src/brpc/input_messenger.h: friend declarations for IouringEndpoint /
IouringTransport.
Bug fixes:
- src/butil/single_threaded_pool.h: rename static member BLOCK_SIZE to
POOL_BLOCK_SIZE to avoid conflict with the BLOCK_SIZE macro defined
by <linux/fs.h> (pulled in via liburing.h).
- src/brpc/iouring_transport.cpp: move DECLARE_bool(usercode_in_*)
inside namespace brpc{} to match the DEFINE_bool site in
event_dispatcher.cpp, fixing linker undefined-reference errors.
Example and documentation:
- example/iouring_performance/: server, client, proto, CMakeLists.txt
mirroring the rdma_performance example; supports WITH_IOURING=1 make
flag.
- example/BUILD.bazel: Bazel targets for the new example.
- docs/cn/iouring.md: Chinese-language guide covering build, flags,
architecture and comparison with RDMA.
0e0b3c1 to
ee143dd
Compare
| const int cnt = io_uring_peek_batch_cqe( | ||
| ring, cqes, static_cast<unsigned>(FLAGS_iouring_max_cqe_poll_once)); | ||
|
|
||
| if (cnt <= 0) { | ||
| if (s->Failed()) { return; } | ||
| if (!m->MoreReadEvents(&progress)) { break; } | ||
| continue; | ||
| } | ||
|
|
||
| ssize_t bytes_read = 0; | ||
|
|
||
| for (int i = 0; i < cnt; ++i) { | ||
| struct io_uring_cqe* cqe = cqes[i]; | ||
| const uint64_t udata = cqe->user_data; | ||
|
|
||
| // Only process CQEs that bRPC submitted (bit 63 == 1). | ||
| // All other CQEs are user-submitted; leave them in the ring so the | ||
| // user callback (called right after PollCq returns) can | ||
| // drain and handle them. Users need no special tagging – bit 63 | ||
| // is never set in a canonical user-space pointer or small integer. | ||
| if (!(udata & kBrpcCqeTag)) { | ||
| continue; // do NOT call io_uring_cqe_seen() | ||
| } |
| struct io_uring_cqe* cqe = nullptr; | ||
| struct __kernel_timespec ts{0, 1000000}; // 1 ms | ||
| int r = io_uring_wait_cqe_timeout(&poller->ring, &cqe, &ts); | ||
| if (r == 0 && cqe) { | ||
| io_uring_cqe_seen(&poller->ring, cqe); | ||
| } |
| if (buf_index_base > 0) { | ||
| // Unregister previous table before re-registering. | ||
| io_uring_unregister_buffers(&poller->ring); | ||
| } | ||
| int r2 = io_uring_register_buffers(&poller->ring, |
| int IouringTransport::ContextInitOrDie(bool /*serverOrNot*/, | ||
| const void* /*_options*/) { | ||
| iouring::GlobalIouringInitializeOrDie(); | ||
| return 0; | ||
| } |
| // Allocate the initial region. | ||
| if (!AddRegion(static_cast<size_t>(FLAGS_iouring_mem_pool_initial_mb))) { | ||
| // Unhook on failure. | ||
| butil::iobuf::blockmem_allocate = prev_allocate_; | ||
| butil::iobuf::blockmem_deallocate = prev_deallocate_; | ||
| return false; | ||
| } |
| if(WITH_IOURING) | ||
| list(APPEND DYNAMIC_LIB ${IOURING_LIB}) | ||
| set(BRPC_PRIVATE_LIBS "${BRPC_PRIVATE_LIBS} -luring") | ||
| endif() | ||
|
|
||
| set(BRPC_PRIVATE_LIBS "-lgflags -lprotobuf -lleveldb -lprotoc -lssl -lcrypto -ldl -lz") |
| | `iouring_iobuf_block_size` | `8192` | IOBuf block 和 read slot 的大小(字节),须与内存池对齐 | | ||
| | `iouring_read_slot_num` | `256` | 每个 ring 初始 read slot 数量 | | ||
| | `iouring_read_slot_max` | `4096` | 每个 ring 最大 read slot 数量 | |
| // --iouring_iobuf_block_size=N IOBuf block / slot size in bytes (default 8192). | ||
| // --iouring_read_slot_num=N Initial read slots per ring (default 256). | ||
| // --iouring_read_slot_max=N Max read slots per ring (default 4096). |
Add a new io_uring-based transport layer (IouringTransport) as an alternative to the existing TCP and RDMA transports, following the same architectural patterns as the RDMA implementation.
Core implementation:
Build system integration:
Framework hooks:
Bug fixes:
Example and documentation:
What problem does this PR solve?
Issue Number:
#1650
#3212
Problem Summary:
What is changed and the side effects?
Changed:
Side effects:
Performance effects:
Breaking backward compatibility:
Check List: