Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion envoy/common/io/io_uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ class Request {
* queue.
* @param result is a return code of submitted system call.
* @param injected indicates whether the completion is injected or not.
* @param flags is the raw ``cqe->flags`` value from the io_uring completion. For multishot
* completions, callers inspect ``IORING_CQE_F_BUFFER`` (the buffer ID is encoded in the upper
* bits) and ``IORING_CQE_F_MORE`` (whether the SQE will produce further completions). For
* injected completions the value is always ``0``.
*/
using CompletionCb = std::function<void(Request* user_data, int32_t result, bool injected)>;
using CompletionCb =
std::function<void(Request* user_data, int32_t result, bool injected, uint32_t flags)>;

/**
* Callback for releasing the user data.
Expand Down
6 changes: 4 additions & 2 deletions source/common/io/io_uring_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {

for (unsigned i = 0; i < count; ++i) {
struct io_uring_cqe* cqe = cqes_[i];
completion_cb(reinterpret_cast<Request*>(cqe->user_data), cqe->res, false);
completion_cb(reinterpret_cast<Request*>(cqe->user_data), cqe->res, false, cqe->flags);
}

io_uring_cq_advance(&ring_, count);
Expand All @@ -80,7 +80,9 @@ void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {
// Iterate the injected completion.
while (!injected_completions_.empty()) {
auto& completion = injected_completions_.front();
completion_cb(completion.user_data_, completion.result_, true);
// Injected completions always carry ``flags == 0``; they do not originate from the kernel
// and have no associated CQE.
completion_cb(completion.user_data_, completion.result_, true, 0);
// The socket may closed in the completion_cb and all the related completions are
// removed.
if (injected_completions_.empty()) {
Expand Down
6 changes: 5 additions & 1 deletion source/common/io/io_uring_worker_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ void IoUringWorkerImpl::injectCompletion(IoUringSocket& socket, Request::Request
void IoUringWorkerImpl::onFileEvent() {
ENVOY_LOG(trace, "io uring worker, on file event");
delay_submit_ = true;
io_uring_->forEveryCompletion([](Request* req, int32_t result, bool injected) {
// ``flags`` carries the raw ``cqe->flags`` value. The worker does not yet branch on it; this
// is wired through so a follow-up change can observe ``IORING_CQE_F_BUFFER`` and
// ``IORING_CQE_F_MORE`` for multishot recv.
io_uring_->forEveryCompletion([](Request* req, int32_t result, bool injected,
uint32_t /*flags*/) {
ENVOY_LOG(trace, "receive request completion, type = {}, req = {}",
static_cast<uint8_t>(req->type()), fmt::ptr(req));
ASSERT(req != nullptr);
Expand Down
14 changes: 7 additions & 7 deletions test/common/io/io_uring_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ TEST_P(IoUringImplParamTest, InvalidParams) {
auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool, uint32_t) {
EXPECT_TRUE(res < 0);
completions_nr++;
});
Expand Down Expand Up @@ -139,7 +139,7 @@ TEST_F(IoUringImplTest, InjectCompletion) {
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion(
[&completions_nr](Request* user_data, int32_t res, bool injected) {
[&completions_nr](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
EXPECT_EQ(-11, res);
Expand Down Expand Up @@ -173,7 +173,7 @@ TEST_F(IoUringImplTest, NestInjectCompletion) {
event_fd,
[this, &fd2, &completions_nr, &request2](uint32_t) {
io_uring_->forEveryCompletion([this, &fd2, &completions_nr,
&request2](Request* user_data, int32_t res, bool injected) {
&request2](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
if (completions_nr == 0) {
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
Expand Down Expand Up @@ -214,7 +214,7 @@ TEST_F(IoUringImplTest, RemoveInjectCompletion) {
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion(
[&completions_nr](Request* user_data, int32_t res, bool injected) {
[&completions_nr](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
EXPECT_EQ(-11, res);
Expand Down Expand Up @@ -250,7 +250,7 @@ TEST_F(IoUringImplTest, NestRemoveInjectCompletion) {
event_fd,
[this, &fd2, &completions_nr, &data2](uint32_t) {
io_uring_->forEveryCompletion(
[this, &fd2, &completions_nr, &data2](Request* user_data, int32_t res, bool injected) {
[this, &fd2, &completions_nr, &data2](Request* user_data, int32_t res, bool injected, uint32_t) {
EXPECT_TRUE(injected);
if (completions_nr == 0) {
EXPECT_EQ(1, dynamic_cast<TestRequest*>(user_data)->data_);
Expand Down Expand Up @@ -302,7 +302,7 @@ TEST_F(IoUringImplTest, PrepareReadvAllDataFitsOneChunk) {
auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &completions_nr, d = dispatcher.get()](uint32_t) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool) {
io_uring_->forEveryCompletion([&completions_nr](Request*, int32_t res, bool, uint32_t) {
completions_nr++;
EXPECT_EQ(res, strlen("test text"));
});
Expand Down Expand Up @@ -348,7 +348,7 @@ TEST_F(IoUringImplTest, PrepareReadvQueueOverflow) {
auto file_event = dispatcher->createFileEvent(
event_fd,
[this, &completions_nr](uint32_t) {
io_uring_->forEveryCompletion([&completions_nr](Request* user_data, int32_t res, bool) {
io_uring_->forEveryCompletion([&completions_nr](Request* user_data, int32_t res, bool, uint32_t) {
EXPECT_TRUE(user_data != nullptr);
EXPECT_EQ(res, 2);
completions_nr++;
Expand Down
31 changes: 16 additions & 15 deletions test/common/io/io_uring_worker_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ TEST(IoUringWorkerImplTest, ServerSocketInjectAfterWrite) {
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&io_uring_socket](const CompletionCb& cb) {
auto* req = new Request(Request::RequestType::Write, io_uring_socket);
cb(req, -EAGAIN, true);
cb(req, -EAGAIN, true, 0);
}));
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();
ASSERT_TRUE(file_event_callback(Event::FileReadyType::Read).ok());
Expand All @@ -244,9 +244,9 @@ TEST(IoUringWorkerImplTest, ServerSocketInjectAfterWrite) {
// Finish the read, cancel and write request, then expect the close request submitted.
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&read_req, &cancel_req, &write_req](const CompletionCb& cb) {
cb(read_req, -EAGAIN, false);
cb(cancel_req, 0, false);
cb(write_req, -EAGAIN, false);
cb(read_req, -EAGAIN, false, 0);
cb(cancel_req, 0, false, 0);
cb(write_req, -EAGAIN, false, 0);
}));
Request* close_req = nullptr;
EXPECT_CALL(mock_io_uring, prepareClose(_, _))
Expand All @@ -257,7 +257,7 @@ TEST(IoUringWorkerImplTest, ServerSocketInjectAfterWrite) {

// After the close request finished, the socket will be cleanup.
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&close_req](const CompletionCb& cb) { cb(close_req, 0, false); }));
.WillOnce(Invoke([&close_req](const CompletionCb& cb) { cb(close_req, 0, false, 0); }));
EXPECT_CALL(mock_io_uring, removeInjectedCompletion(fd));
EXPECT_CALL(dispatcher, deferredDelete_);
EXPECT_CALL(dispatcher, clearDeferredDeleteList());
Expand Down Expand Up @@ -296,7 +296,7 @@ TEST(IoUringWorkerImplTest, ServerSocketInjectAfterRead) {
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&io_uring_socket](const CompletionCb& cb) {
auto* req = new Request(Request::RequestType::Write, io_uring_socket);
cb(req, -EAGAIN, true);
cb(req, -EAGAIN, true, 0);
}));
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();
ASSERT_TRUE(file_event_callback(Event::FileReadyType::Read).ok());
Expand All @@ -313,8 +313,8 @@ TEST(IoUringWorkerImplTest, ServerSocketInjectAfterRead) {
// Finish the read and cancel request, then expect the close request submitted.
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&read_req, &cancel_req](const CompletionCb& cb) {
cb(read_req, -EAGAIN, false);
cb(cancel_req, 0, false);
cb(read_req, -EAGAIN, false, 0);
cb(cancel_req, 0, false, 0);
}));
Request* close_req = nullptr;
EXPECT_CALL(mock_io_uring, prepareClose(_, _))
Expand All @@ -325,7 +325,7 @@ TEST(IoUringWorkerImplTest, ServerSocketInjectAfterRead) {

// After the close request finished, the socket will be cleanup.
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&close_req](const CompletionCb& cb) { cb(close_req, 0, false); }));
.WillOnce(Invoke([&close_req](const CompletionCb& cb) { cb(close_req, 0, false, 0); }));
EXPECT_CALL(mock_io_uring, removeInjectedCompletion(fd));
EXPECT_CALL(dispatcher, deferredDelete_);
EXPECT_CALL(dispatcher, clearDeferredDeleteList());
Expand Down Expand Up @@ -380,13 +380,13 @@ TEST(IoUringWorkerImplTest, CloseAllSocketsWhenDestruction) {
EXPECT_CALL(mock_io_uring, removeInjectedCompletion(fd));

// Fake the read request cancel completion.
cb(read_req, -ECANCELED, false);
cb(read_req, -ECANCELED, false, 0);

// Fake the cancel request is done.
cb(cancel_req, 0, false);
cb(cancel_req, 0, false, 0);

// Fake the close request is done.
cb(close_req, 0, false);
cb(close_req, 0, false, 0);
}));

EXPECT_CALL(dispatcher, deferredDelete_);
Expand Down Expand Up @@ -422,7 +422,8 @@ TEST(IoUringWorkerImplTest, ServerCloseWithWriteRequestOnly) {
io_uring_socket.disableRead();
// Fake the read request finish.
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&read_req](const CompletionCb& cb) { cb(read_req, -EAGAIN, false); }));
.WillOnce(
Invoke([&read_req](const CompletionCb& cb) { cb(read_req, -EAGAIN, false, 0); }));
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();
ASSERT_TRUE(file_event_callback(Event::FileReadyType::Read).ok());

Expand All @@ -448,13 +449,13 @@ TEST(IoUringWorkerImplTest, ServerCloseWithWriteRequestOnly) {
.RetiresOnSaturation();
EXPECT_CALL(mock_io_uring, submit()).Times(1).RetiresOnSaturation();

cb(write_req, -EAGAIN, false);
cb(write_req, -EAGAIN, false, 0);
}));
ASSERT_TRUE(file_event_callback(Event::FileReadyType::Read).ok());

// After the close request finished, the socket will be cleanup.
EXPECT_CALL(mock_io_uring, forEveryCompletion(_))
.WillOnce(Invoke([&close_req](const CompletionCb& cb) { cb(close_req, 0, false); }));
.WillOnce(Invoke([&close_req](const CompletionCb& cb) { cb(close_req, 0, false, 0); }));
EXPECT_CALL(mock_io_uring, removeInjectedCompletion(fd));
EXPECT_CALL(dispatcher, deferredDelete_);
EXPECT_CALL(dispatcher, clearDeferredDeleteList());
Expand Down