Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 48 additions & 1 deletion envoy/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ class Request {
* queue.
* @param result is a return code of submitted system call.
* @param injected indicates whether the completion is injected or not.
* @param flags is the raw ``cqe->flags`` value from the io_uring completion. For multishot
* completions, callers inspect ``IORING_CQE_F_BUFFER`` (the buffer ID is encoded in the upper
* bits) and ``IORING_CQE_F_MORE`` (whether the SQE will produce further completions). For
* injected completions the value is always ``0``.
*/
using CompletionCb = std::function<void(Request* user_data, int32_t result, bool injected)>;
using CompletionCb =
std::function<void(Request* user_data, int32_t result, bool injected, uint32_t flags)>;

/**
* Callback for releasing the user data.
Expand Down Expand Up @@ -147,6 +152,48 @@ class IoUring {
*/
virtual IoUringResult prepareShutdown(os_fd_t fd, int how, Request* user_data) PURE;

/**
* Set up a kernel-managed buffer ring of ``count`` buffers each of ``buf_size`` bytes. The
* kernel selects a buffer from this ring on each multishot recv completion, eliminating the
* per-read allocation that ``prepareReadv`` requires. Only one buf-ring may be set up per
* ``IoUring`` instance for now; subsequent calls return ``IoUringResult::Failed``.
*
* @param group_id buffer group identifier; subsequent ``prepareRecvMultishot`` and
* ``recycleBuffer`` calls reference this id.
* @param count number of buffers; must be a power of two.
* @param buf_size size of each buffer in bytes.
* @return ``Ok`` on success; ``Failed`` if buf-rings are unsupported by the kernel or if a
* buf-ring has already been set up on this instance.
*/
virtual IoUringResult setupBufRing(uint16_t group_id, uint32_t count, uint32_t buf_size) PURE;

/**
* Prepare a multishot recv. The same SQE may produce multiple completions, each carrying
* ``IORING_CQE_F_BUFFER`` (the buffer ID is in the upper bits of ``flags``) and
* ``IORING_CQE_F_MORE`` (set while the SQE remains armed). When ``F_MORE`` is clear the
* caller must re-arm by issuing another ``prepareRecvMultishot``.
*
* @param fd the socket fd.
* @param group_id the buffer group set up via ``setupBufRing``.
* @param user_data attached to the SQE; surfaced on every completion of this SQE.
*/
virtual IoUringResult prepareRecvMultishot(os_fd_t fd, uint16_t group_id,
Request* user_data) PURE;

/**
* Return the storage backing buffer ID ``bid`` in group ``group_id``. The caller is expected
* to consume up to ``cqe->res`` bytes (the value passed as ``result`` to the completion
* callback) and then recycle the buffer via ``recycleBuffer``.
*/
virtual uint8_t* getBufferForBid(uint16_t group_id, uint16_t bid) PURE;

/**
* Return a buffer to the buf-ring after the user has finished consuming it. Until the buffer
* is recycled the kernel cannot reuse it for new completions; failing to recycle promptly
* may cause the kernel to terminate the multishot recv with ``-ENOBUFS``.
*/
virtual void recycleBuffer(uint16_t group_id, uint16_t bid) PURE;

/**
* Submits the entries in the submission queue to the kernel using the
* `io_uring_enter()` system call.
Expand Down
92 changes: 89 additions & 3 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,14 @@ IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polli
RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret)));
}

IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); }
IoUringImpl::~IoUringImpl() {
if (buf_ring_ != nullptr) {
// Ignore the return value; we are tearing down regardless.
io_uring_free_buf_ring(&ring_, buf_ring_, buf_count_, buf_group_id_);
buf_ring_ = nullptr;
}
io_uring_queue_exit(&ring_);
}

os_fd_t IoUringImpl::registerEventfd() {
ASSERT(!isEventfdRegistered());
Expand Down Expand Up @@ -69,7 +76,7 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
completion_cb(reinterpret_cast<Request*>(cqe->user_data), cqe->res, false);
completion_cb(reinterpret_cast<Request*>(cqe->user_data), cqe->res, false, cqe->flags);
}

io_uring_cq_advance(&ring_, count);
Expand All @@ -80,7 +87,9 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {
// Iterate the injected completion.
while (!injected_completions_.empty()) {
auto& completion = injected_completions_.front();
completion_cb(completion.user_data_, completion.result_, true);
// Injected completions always carry ``flags == 0``; they do not originate from the kernel
// and have no associated CQE.
completion_cb(completion.user_data_, completion.result_, true, 0);
// The socket may closed in the completion_cb and all the related completions are
// removed.
if (injected_completions_.empty()) {
Expand Down Expand Up @@ -195,6 +204,83 @@ IoUringResult IoUringImpl::prepareShutdown(os_fd_t fd, int how, Request* user_da
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::setupBufRing(uint16_t group_id, uint32_t count, uint32_t buf_size) {
if (buf_ring_ != nullptr) {
ENVOY_LOG(warn, "buf ring already set up for group {}, refusing to set up group {}",
buf_group_id_, group_id);
return IoUringResult::Failed;
}
if (count == 0 || (count & (count - 1)) != 0) {
ENVOY_LOG(warn, "buf ring count must be a non-zero power of two, got {}", count);
return IoUringResult::Failed;
}
if (buf_size == 0) {
ENVOY_LOG(warn, "buf ring buf_size must be > 0");
return IoUringResult::Failed;
}

int ret = 0;
struct io_uring_buf_ring* br =
io_uring_setup_buf_ring(&ring_, count, group_id, /*flags=*/0, &ret);
if (br == nullptr) {
ENVOY_LOG(warn, "io_uring_setup_buf_ring failed: {}", errorDetails(-ret));
return IoUringResult::Failed;
}

buf_storage_ = std::make_unique<uint8_t[]>(static_cast<size_t>(count) * buf_size);
const int mask = io_uring_buf_ring_mask(count);
for (uint32_t i = 0; i < count; i++) {
io_uring_buf_ring_add(br, buf_storage_.get() + static_cast<size_t>(i) * buf_size, buf_size,
/*bid=*/static_cast<uint16_t>(i), mask, /*buf_offset=*/static_cast<int>(i));
}
io_uring_buf_ring_advance(br, count);

buf_ring_ = br;
buf_group_id_ = group_id;
buf_count_ = count;
buf_size_ = buf_size;
ENVOY_LOG(debug, "set up buf ring: group_id = {}, count = {}, buf_size = {}", group_id, count,
buf_size);
return IoUringResult::Ok;
}

IoUringResult IoUringImpl::prepareRecvMultishot(os_fd_t fd, uint16_t group_id,
Request* user_data) {
ENVOY_LOG(trace, "prepare recv multishot for fd = {}, group_id = {}", fd, group_id);
ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
if (buf_ring_ == nullptr || group_id != buf_group_id_) {
ENVOY_LOG(warn, "prepareRecvMultishot called for unknown buf ring group {}", group_id);
return IoUringResult::Failed;
}
struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
if (sqe == nullptr) {
return IoUringResult::Failed;
}

io_uring_prep_recv_multishot(sqe, fd, /*buf=*/nullptr, /*len=*/0, /*flags=*/0);
sqe->buf_group = group_id;
sqe->flags |= IOSQE_BUFFER_SELECT;
io_uring_sqe_set_data(sqe, user_data);
return IoUringResult::Ok;
}

uint8_t* IoUringImpl::getBufferForBid(uint16_t group_id, uint16_t bid) {
ASSERT(buf_ring_ != nullptr);
ASSERT(group_id == buf_group_id_);
ASSERT(bid < buf_count_);
return buf_storage_.get() + static_cast<size_t>(bid) * buf_size_;
}

void IoUringImpl::recycleBuffer(uint16_t group_id, uint16_t bid) {
ASSERT(buf_ring_ != nullptr);
ASSERT(group_id == buf_group_id_);
ASSERT(bid < buf_count_);
const int mask = io_uring_buf_ring_mask(buf_count_);
io_uring_buf_ring_add(buf_ring_, buf_storage_.get() + static_cast<size_t>(bid) * buf_size_,
buf_size_, bid, mask, /*buf_offset=*/0);
io_uring_buf_ring_advance(buf_ring_, 1);
}

IoUringResult IoUringImpl::submit() {
int res = io_uring_submit(&ring_);
RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries");
Expand Down
14 changes: 14 additions & 0 deletions source/common/io/io_uring_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ class IoUringImpl : public IoUring,
IoUringResult prepareClose(os_fd_t fd, Request* user_data) override;
IoUringResult prepareCancel(Request* cancelling_user_data, Request* user_data) override;
IoUringResult prepareShutdown(os_fd_t fd, int how, Request* user_data) override;
IoUringResult setupBufRing(uint16_t group_id, uint32_t count, uint32_t buf_size) override;
IoUringResult prepareRecvMultishot(os_fd_t fd, uint16_t group_id, Request* user_data) override;
uint8_t* getBufferForBid(uint16_t group_id, uint16_t bid) override;
void recycleBuffer(uint16_t group_id, uint16_t bid) override;
IoUringResult submit() override;
void injectCompletion(os_fd_t fd, Request* user_data, int32_t result) override;
void removeInjectedCompletion(os_fd_t fd) override;
Expand All @@ -52,6 +56,16 @@ class IoUringImpl : public IoUring,
std::vector<struct io_uring_cqe*> cqes_;
os_fd_t event_fd_{INVALID_SOCKET};
std::list<InjectedCompletion> injected_completions_;

// Buf-ring state. ``buf_ring_`` is set when ``setupBufRing`` succeeds; only one buf-ring is
// supported per ring for now.
struct io_uring_buf_ring* buf_ring_{nullptr};
uint16_t buf_group_id_{0};
uint32_t buf_count_{0};
uint32_t buf_size_{0};
// Backing storage for the buf-ring. ``buf_count_ * buf_size_`` bytes carved into ``buf_count_``
// contiguous slots; slot ``i`` lives at ``buf_storage_.get() + i * buf_size_``.
std::unique_ptr<uint8_t[]> buf_storage_;
};

} // namespace Io
Expand Down
6 changes: 5 additions & 1 deletion source/common/io/io_uring_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ void IoUringWorkerImpl::injectCompletion(IoUringSocket& socket, Request::Request
void IoUringWorkerImpl::onFileEvent() {
ENVOY_LOG(trace, "io uring worker, on file event");
delay_submit_ = true;
io_uring_->forEveryCompletion([](Request* req, int32_t result, bool injected) {
// ``flags`` carries the raw ``cqe->flags`` value. The worker does not yet branch on it; this
// is wired through so a follow-up change can observe ``IORING_CQE_F_BUFFER`` and
// ``IORING_CQE_F_MORE`` for multishot recv.
io_uring_->forEveryCompletion([](Request* req, int32_t result, bool injected,
uint32_t /*flags*/) {
ENVOY_LOG(trace, "receive request completion, type = {}, req = {}",
static_cast<uint8_t>(req->type()), fmt::ptr(req));
ASSERT(req != nullptr);
Expand Down
114 changes: 107 additions & 7 deletions test/common/io/io_uring_impl_test.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
#include <sys/socket.h>
#include <unistd.h>

#include <cstring>
#include <functional>

#include "source/common/io/io_uring_impl.h"
Expand Down Expand Up @@ -102,7 +106,7 @@ TEST_P(IoUringImplParamTest, InvalidParams) {
auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool, uint32_t) {
EXPECT_TRUE(res < 0);
completions_nr++;
});
Expand Down Expand Up @@ -139,7 +143,7 @@ TEST_F(IoUringImplTest, InjectCompletion) {
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion(
[&completions_nr](Request* user_data, int32_t res, bool injected) {
[&completions_nr](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
EXPECT_EQ(-11, res);
Expand Down Expand Up @@ -173,7 +177,7 @@ TEST_F(IoUringImplTest, NestInjectCompletion) {
event_fd,
[this, &fd2, &completions_nr, &request2](uint32_t) {
io_uring_->forEveryCompletion([this, &fd2, &completions_nr,
&request2](Request* user_data, int32_t res, bool injected) {
&request2](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
if (completions_nr == 0) {
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
Expand Down Expand Up @@ -214,7 +218,7 @@ TEST_F(IoUringImplTest, RemoveInjectCompletion) {
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion(
[&completions_nr](Request* user_data, int32_t res, bool injected) {
[&completions_nr](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
EXPECT_EQ(-11, res);
Expand Down Expand Up @@ -250,7 +254,7 @@ TEST_F(IoUringImplTest, NestRemoveInjectCompletion) {
event_fd,
[this, &fd2, &completions_nr, &data2](uint32_t) {
io_uring_->forEveryCompletion(
[this, &fd2, &completions_nr, &data2](Request* user_data, int32_t res, bool injected) {
[this, &fd2, &completions_nr, &data2](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
if (completions_nr == 0) {
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
Expand Down Expand Up @@ -302,7 +306,7 @@ TEST_F(IoUringImplTest, PrepareReadvAllDataFitsOneChunk) {
auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &completions_nr, d = dispatcher.get()](uint32_t) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool, uint32_t) {
completions_nr++;
EXPECT_EQ(res, strlen("test text"));
});
Expand Down Expand Up @@ -348,7 +352,7 @@ TEST_F(IoUringImplTest, PrepareReadvQueueOverflow) {
auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion([&completions_nr](Request* user_data, int32_t res, bool) {
io_uring_->forEveryCompletion([&completions_nr](Request* user_data, int32_t res, bool, uint32_t) {
EXPECT_TRUE(user_data != nullptr);
EXPECT_EQ(res, 2);
completions_nr++;
Expand Down Expand Up @@ -401,6 +405,102 @@ TEST_F(IoUringImplTest, PrepareReadvQueueOverflow) {
EXPECT_EQ(static_cast<char*>(iov3.iov_base)[1], 'f');
}

// Validates ``setupBufRing`` rejects invalid configuration without leaving the ring in a half-set
// state.
TEST_F(IoUringImplTest, SetupBufRingValidatesInputs) {
// Count must be > 0 and a power of two.
EXPECT_EQ(IoUringResult::Failed, io_uring_->setupBufRing(/*group_id=*/0, /*count=*/0,
/*buf_size=*/1024));
EXPECT_EQ(IoUringResult::Failed, io_uring_->setupBufRing(/*group_id=*/0, /*count=*/3,
/*buf_size=*/1024));
// buf_size must be > 0.
EXPECT_EQ(IoUringResult::Failed, io_uring_->setupBufRing(/*group_id=*/0, /*count=*/4,
/*buf_size=*/0));

// First valid call succeeds; a second is rejected because we only support one ring per
// ``IoUring`` instance.
ASSERT_EQ(IoUringResult::Ok, io_uring_->setupBufRing(/*group_id=*/0, /*count=*/4,
/*buf_size=*/1024));
EXPECT_EQ(IoUringResult::Failed, io_uring_->setupBufRing(/*group_id=*/1, /*count=*/4,
/*buf_size=*/1024));
}

// End-to-end: arm a multishot recv against one end of a socketpair, write from the other end,
// and verify the kernel hands us a buffer plus the ``F_MORE`` flag indicating the SQE is still
// armed. Recycling the buffer and writing again yields a second completion from the same SQE.
TEST_F(IoUringImplTest, MultishotRecvDeliversBuffersAndStaysArmed) {
// Multishot recv requires kernel >= 5.19. ``isIoUringSupported`` only checks that
// ``io_uring_queue_init_params`` works, so probe the multishot path here and skip if buf-rings
// are unsupported on this kernel.
if (io_uring_->setupBufRing(/*group_id=*/0, /*count=*/4, /*buf_size=*/1024) !=
IoUringResult::Ok) {
GTEST_SKIP() << "buf-ring unsupported on this kernel";
}

int sv[2];
ASSERT_EQ(0, ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv));
const os_fd_t recv_fd = sv[0];
const os_fd_t send_fd = sv[1];

auto dispatcher = api_->allocateDispatcher("test_thread");
os_fd_t event_fd = io_uring_->registerEventfd();

int data = 1;
TestRequest request(data);
std::vector<std::string> received;
std::vector<uint16_t> bids;
bool more_clear = false;

auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &received, &bids, &more_clear](uint32_t) {
io_uring_->forEveryCompletion(
[this, &received, &bids, &more_clear](Request*, int32_t res, bool, uint32_t flags) {
ASSERT_GT(res, 0);
ASSERT_TRUE(flags & IORING_CQE_F_BUFFER);
const uint16_t bid = static_cast<uint16_t>(flags >> IORING_CQE_BUFFER_SHIFT);
bids.push_back(bid);
uint8_t* buf = io_uring_->getBufferForBid(/*group_id=*/0, bid);
received.emplace_back(reinterpret_cast<const char*>(buf),
static_cast<size_t>(res));
if (!(flags & IORING_CQE_F_MORE)) {
more_clear = true;
}
io_uring_->recycleBuffer(/*group_id=*/0, bid);
});
return absl::OkStatus();
},
Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);

ASSERT_EQ(IoUringResult::Ok,
io_uring_->prepareRecvMultishot(recv_fd, /*group_id=*/0, &request));
ASSERT_EQ(IoUringResult::Ok, io_uring_->submit());

const std::string msg1 = "hello";
ASSERT_EQ(static_cast<ssize_t>(msg1.size()),
::write(send_fd, msg1.data(), msg1.size()));
waitForCondition(*dispatcher, [&received]() { return received.size() == 1; });
EXPECT_EQ(msg1, received[0]);
EXPECT_FALSE(more_clear) << "first completion should leave SQE armed (F_MORE set)";

// Second write reuses the same multishot SQE — proves recycling worked and the SQE is still
// armed.
const std::string msg2 = "world";
ASSERT_EQ(static_cast<ssize_t>(msg2.size()),
::write(send_fd, msg2.data(), msg2.size()));
waitForCondition(*dispatcher, [&received]() { return received.size() == 2; });
EXPECT_EQ(msg2, received[1]);

// The kernel may pick the same bid both times after our recycle, but each completion must
// carry a valid bid in [0, count).
for (uint16_t bid : bids) {
EXPECT_LT(bid, 4);
}

::close(send_fd);
::close(recv_fd);
}

} // namespace
} // namespace Io
} // namespace Envoy
Loading